From 4d0d9d61eb443190095a9cd2d9a04e4d5551afc0 Mon Sep 17 00:00:00 2001 From: Janardhan Hungund Date: Wed, 21 Aug 2024 20:56:08 +0530 Subject: [PATCH 1/3] HBASE-28805: Chunked persistence of backing map for persistent bucket cache. The persistent bucket cache implementation feature relies on the persistence of backing map to a persistent file. the protobuf APIs are used to serialise the backing map and its related structures into the file. An asynchronous thread periodically flushes the contents of backing map to the persistence file. The protobuf library has a limitation of 2GB on the size of protobuf messages. If the size of backing map increases beyond 2GB, an unexpected exception is reported in the asynchronous thread and stops the persister thread. This causes the persistent file go out of sync with the actual bucket cache. Due to this, the bucket cache shrinks to a smaller size after a cache restart. Checksum errors are also reported. This Jira tracks the implementation of introducing chunking of the backing map to persistence such that every protobuf is smaller than 2GB in size. Change-Id: I8623ad2eaf1d1a56f96bc3120b14e5229ae55c42 --- .../hbase/io/hfile/bucket/BucketCache.java | 186 +++++++++++++----- .../io/hfile/bucket/BucketProtoUtils.java | 36 +++- .../bucket/TestVerifyBucketCacheFile.java | 47 +++++ 3 files changed, 213 insertions(+), 56 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java index 5816b8ff1602..6d6002e06aae 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java @@ -25,6 +25,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.Comparator; import java.util.HashSet; @@ -77,6 +78,7 @@ import org.apache.hadoop.hbase.nio.RefCnt; import org.apache.hadoop.hbase.protobuf.ProtobufMagic; import org.apache.hadoop.hbase.regionserver.StoreFileInfo; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.IdReadWriteLock; @@ -122,6 +124,7 @@ public class BucketCache implements BlockCache, HeapSize { static final String EXTRA_FREE_FACTOR_CONFIG_NAME = "hbase.bucketcache.extrafreefactor"; static final String ACCEPT_FACTOR_CONFIG_NAME = "hbase.bucketcache.acceptfactor"; static final String MIN_FACTOR_CONFIG_NAME = "hbase.bucketcache.minfactor"; + static final String BACKING_MAP_PERSISTENCE_CHUNK_SIZE = "hbase.bucketcache.persistence.chunksize"; /** Use strong reference for offsetLock or not */ private static final String STRONG_REF_KEY = "hbase.bucketcache.offsetlock.usestrongref"; @@ -145,6 +148,10 @@ public class BucketCache implements BlockCache, HeapSize { final static int DEFAULT_WRITER_THREADS = 3; final static int DEFAULT_WRITER_QUEUE_ITEMS = 64; + final static long DEFAULT_BACKING_MAP_PERSISTENCE_CHUNK_SIZE = 10000000; + + final static byte[] PB_MAGIC_V2 = new byte[] { 'V', '2', 'U', 'F' }; + // Store/read block data transient final IOEngine ioEngine; @@ -273,6 +280,8 @@ public class BucketCache implements BlockCache, HeapSize { */ private String algorithm; + private long persistenceChunkSize; + /* Tracing failed Bucket Cache allocations. */ private long allocFailLogPrevTs; // time of previous log event for allocation failure. private static final int ALLOCATION_FAIL_LOG_TIME_PERIOD = 60000; // Default 1 minute. @@ -313,6 +322,11 @@ public BucketCache(String ioEngineName, long capacity, int blockSize, int[] buck this.queueAdditionWaitTime = conf.getLong(QUEUE_ADDITION_WAIT_TIME, DEFAULT_QUEUE_ADDITION_WAIT_TIME); this.bucketcachePersistInterval = conf.getLong(BUCKETCACHE_PERSIST_INTERVAL_KEY, 1000); + this.persistenceChunkSize = conf.getLong(BACKING_MAP_PERSISTENCE_CHUNK_SIZE, + DEFAULT_BACKING_MAP_PERSISTENCE_CHUNK_SIZE); + if (this.persistenceChunkSize <= 0) { + persistenceChunkSize = DEFAULT_BACKING_MAP_PERSISTENCE_CHUNK_SIZE; + } sanityCheckConfigs(); @@ -1358,8 +1372,8 @@ void persistToFile() throws IOException { } File tempPersistencePath = new File(persistencePath + EnvironmentEdgeManager.currentTime()); try (FileOutputStream fos = new FileOutputStream(tempPersistencePath, false)) { - fos.write(ProtobufMagic.PB_MAGIC); - BucketProtoUtils.toPB(this).writeDelimitedTo(fos); + LOG.debug("Persist in new chunked persistence format."); + persistChunkedBackingMap(fos); } catch (IOException e) { LOG.error("Failed to persist bucket cache to file", e); throw e; @@ -1405,16 +1419,23 @@ private void retrieveFromFile(int[] bucketSizes) throws IOException { throw new IOException("Incorrect number of bytes read while checking for protobuf magic " + "number. Requested=" + pblen + ", Received= " + read + ", File=" + persistencePath); } - if (!ProtobufMagic.isPBMagicPrefix(pbuf)) { + if (ProtobufMagic.isPBMagicPrefix(pbuf)) { + LOG.info("Reading old format of persistence."); + // The old non-chunked version of backing map persistence. + parsePB(BucketCacheProtos.BucketCacheEntry.parseDelimitedFrom(in)); + } else if (Arrays.equals(pbuf, PB_MAGIC_V2)) { + // The new persistence format of chunked persistence. + LOG.info("Reading new chunked format of persistence."); + retrieveChunkedBackingMap(in, bucketSizes); + } else { // In 3.0 we have enough flexibility to dump the old cache data. // TODO: In 2.x line, this might need to be filled in to support reading the old format throw new IOException( "Persistence file does not start with protobuf magic number. " + persistencePath); } - parsePB(BucketCacheProtos.BucketCacheEntry.parseDelimitedFrom(in)); bucketAllocator = new BucketAllocator(cacheCapacity, bucketSizes, backingMap, realCacheSize); blockNumber.add(backingMap.size()); - LOG.info("Bucket cache retrieved from file successfully"); + LOG.info("Bucket cache retrieved from file successfully with size: {}", backingMap.size()); } } @@ -1457,6 +1478,75 @@ private void verifyCapacityAndClasses(long capacitySize, String ioclass, String } } + private void verifyFileIntegrity(BucketCacheProtos.BucketCacheEntry proto) { + try { + if (proto.hasChecksum()) { + ((PersistentIOEngine) ioEngine).verifyFileIntegrity(proto.getChecksum().toByteArray(), + algorithm); + } + backingMapValidated.set(true); + } catch (IOException e) { + LOG.warn("Checksum for cache file failed. " + + "We need to validate each cache key in the backing map. " + + "This may take some time, so we'll do it in a background thread,"); + + Runnable cacheValidator = () -> { + while (bucketAllocator == null) { + try { + Thread.sleep(50); + } catch (InterruptedException ex) { + throw new RuntimeException(ex); + } + } + long startTime = EnvironmentEdgeManager.currentTime(); + int totalKeysOriginally = backingMap.size(); + for (Map.Entry keyEntry : backingMap.entrySet()) { + try { + ((FileIOEngine) ioEngine).checkCacheTime(keyEntry.getValue()); + } catch (IOException e1) { + LOG.debug("Check for key {} failed. Evicting.", keyEntry.getKey()); + evictBlock(keyEntry.getKey()); + fileNotFullyCached(keyEntry.getKey().getHfileName()); + } + } + backingMapValidated.set(true); + LOG.info("Finished validating {} keys in the backing map. Recovered: {}. This took {}ms.", + totalKeysOriginally, backingMap.size(), + (EnvironmentEdgeManager.currentTime() - startTime)); + }; + Thread t = new Thread(cacheValidator); + t.setDaemon(true); + t.start(); + } + } + + private void parsePB(BucketCacheProtos.BucketCacheEntry firstChunk, + List chunks) throws IOException { + fullyCachedFiles.clear(); + Pair, NavigableSet> pair = + BucketProtoUtils.fromPB(firstChunk.getDeserializersMap(), firstChunk.getBackingMap(), + this::createRecycler); + backingMap.putAll(pair.getFirst()); + blocksByHFile.addAll(pair.getSecond()); + fullyCachedFiles.putAll(BucketProtoUtils.fromPB(firstChunk.getCachedFilesMap())); + + LOG.debug("Number of blocks after first chunk: {}, blocksByHFile: {}", + backingMap.size(), fullyCachedFiles.size()); + int i = 1; + for (BucketCacheProtos.BackingMap chunk : chunks) { + Pair, NavigableSet> pair2 = + BucketProtoUtils.fromPB(firstChunk.getDeserializersMap(), chunk, + this::createRecycler); + backingMap.putAll(pair2.getFirst()); + blocksByHFile.addAll(pair2.getSecond()); + LOG.debug("Number of blocks after {} reading chunk: {}, blocksByHFile: {}", + ++i, backingMap.size(), fullyCachedFiles.size()); + } + verifyFileIntegrity(firstChunk); + verifyCapacityAndClasses(firstChunk.getCacheCapacity(), firstChunk.getIoClass(), firstChunk.getMapClass()); + updateRegionSizeMapWhileRetrievingFromFile(); + } + private void parsePB(BucketCacheProtos.BucketCacheEntry proto) throws IOException { Pair, NavigableSet> pair = BucketProtoUtils.fromPB(proto.getDeserializersMap(), proto.getBackingMap(), @@ -1465,52 +1555,54 @@ private void parsePB(BucketCacheProtos.BucketCacheEntry proto) throws IOExceptio blocksByHFile = pair.getSecond(); fullyCachedFiles.clear(); fullyCachedFiles.putAll(BucketProtoUtils.fromPB(proto.getCachedFilesMap())); - if (proto.hasChecksum()) { - try { - ((PersistentIOEngine) ioEngine).verifyFileIntegrity(proto.getChecksum().toByteArray(), - algorithm); - backingMapValidated.set(true); - } catch (IOException e) { - LOG.warn("Checksum for cache file failed. " - + "We need to validate each cache key in the backing map. " - + "This may take some time, so we'll do it in a background thread,"); - Runnable cacheValidator = () -> { - while (bucketAllocator == null) { - try { - Thread.sleep(50); - } catch (InterruptedException ex) { - throw new RuntimeException(ex); - } - } - long startTime = EnvironmentEdgeManager.currentTime(); - int totalKeysOriginally = backingMap.size(); - for (Map.Entry keyEntry : backingMap.entrySet()) { - try { - ((FileIOEngine) ioEngine).checkCacheTime(keyEntry.getValue()); - } catch (IOException e1) { - LOG.debug("Check for key {} failed. Evicting.", keyEntry.getKey()); - evictBlock(keyEntry.getKey()); - fullyCachedFiles.remove(keyEntry.getKey().getHfileName()); - } - } - backingMapValidated.set(true); - LOG.info("Finished validating {} keys in the backing map. Recovered: {}. This took {}ms.", - totalKeysOriginally, backingMap.size(), - (EnvironmentEdgeManager.currentTime() - startTime)); - }; - Thread t = new Thread(cacheValidator); - t.setDaemon(true); - t.start(); - } - } else { - // if has not checksum, it means the persistence file is old format - LOG.info("Persistent file is old format, it does not support verifying file integrity!"); - backingMapValidated.set(true); - } + verifyFileIntegrity(proto); updateRegionSizeMapWhileRetrievingFromFile(); verifyCapacityAndClasses(proto.getCacheCapacity(), proto.getIoClass(), proto.getMapClass()); } + private void persistChunkedBackingMap(FileOutputStream fos) throws IOException { + fos.write(PB_MAGIC_V2); + long numChunks = backingMap.size() / persistenceChunkSize; + if (backingMap.size() % persistenceChunkSize != 0) { + numChunks += 1; + } + + LOG.debug("persistToFile: before persisting backing map size: {}, " + + "fullycachedFiles size: {}, chunkSize: {}, numberofChunks: {}", + backingMap.size(), fullyCachedFiles.size(), persistenceChunkSize, numChunks); + + fos.write(Bytes.toBytes(persistenceChunkSize)); + fos.write(Bytes.toBytes(numChunks)); + BucketProtoUtils.toPB(this, fos, persistenceChunkSize); + + LOG.debug("persistToFile: after persisting backing map size: {}, " + + "fullycachedFiles size: {}, numChunksPersisteed: {}", + backingMap.size(), fullyCachedFiles.size(), numChunks); + } + + private void retrieveChunkedBackingMap(FileInputStream in, int[] bucketSizes) throws IOException { + byte[] bytes = new byte[Long.BYTES]; + in.read(bytes); + long batchSize = Bytes.toLong(bytes, 0); + in.read(bytes); + long numChunks = Bytes.toLong(bytes, 0); + + LOG.info("Number of chunks: {}, chunk size: {}", numChunks, batchSize); + + ArrayList bucketCacheMaps = new ArrayList<>(); + // Read the first chunk that has all the details. + BucketCacheProtos.BucketCacheEntry firstChunk = + BucketCacheProtos.BucketCacheEntry.parseDelimitedFrom(in); + + // Subsequent chunks have the backingMap entries. + for (int i = 1; i < numChunks; i++) { + LOG.info("Reading chunk no: {}", i+1); + bucketCacheMaps.add(BucketCacheProtos.BackingMap.parseDelimitedFrom(in)); + LOG.info("Retrieved chunk: {}", i+1); + } + parsePB(firstChunk, bucketCacheMaps); + } + /** * Check whether we tolerate IO error this time. If the duration of IOEngine throwing errors * exceeds ioErrorsDurationTimeTolerated, we will disable the cache diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketProtoUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketProtoUtils.java index 4b42414fb9c5..3b39afa9355c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketProtoUtils.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketProtoUtils.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.io.hfile.bucket; +import java.io.FileOutputStream; import java.io.IOException; import java.util.Comparator; import java.util.HashMap; @@ -45,28 +46,45 @@ private BucketProtoUtils() { } - static BucketCacheProtos.BucketCacheEntry toPB(BucketCache cache) { + static BucketCacheProtos.BucketCacheEntry toPB(BucketCache cache, BucketCacheProtos.BackingMap backingMap) { return BucketCacheProtos.BucketCacheEntry.newBuilder().setCacheCapacity(cache.getMaxSize()) .setIoClass(cache.ioEngine.getClass().getName()) .setMapClass(cache.backingMap.getClass().getName()) .putAllDeserializers(CacheableDeserializerIdManager.save()) .putAllCachedFiles(toCachedPB(cache.fullyCachedFiles)) - .setBackingMap(BucketProtoUtils.toPB(cache.backingMap)) + .setBackingMap(backingMap) .setChecksum(ByteString .copyFrom(((PersistentIOEngine) cache.ioEngine).calculateChecksum(cache.getAlgorithm()))) .build(); } - private static BucketCacheProtos.BackingMap toPB(Map backingMap) { + static void toPB(BucketCache cache, FileOutputStream fos, long chunkSize) throws IOException{ + int blockCount = 0; + int chunkCount = 0; + int backingMapSize = cache.backingMap.size(); BucketCacheProtos.BackingMap.Builder builder = BucketCacheProtos.BackingMap.newBuilder(); - for (Map.Entry entry : backingMap.entrySet()) { - builder.addEntry(BucketCacheProtos.BackingMapEntry.newBuilder().setKey(toPB(entry.getKey())) - .setValue(toPB(entry.getValue())).build()); + for (Map.Entry entry : cache.backingMap.entrySet()) { + blockCount++; + builder.addEntry(BucketCacheProtos.BackingMapEntry.newBuilder() + .setKey(BucketProtoUtils.toPB(entry.getKey())) + .setValue(BucketProtoUtils.toPB(entry.getValue())).build()); + if (blockCount % chunkSize == 0 || (blockCount == backingMapSize)) { + chunkCount++; + if (chunkCount == 1) { + // Persist all details along with the first chunk into BucketCacheEntry + BucketProtoUtils.toPB(cache, builder.build()).writeDelimitedTo(fos); + } else { + // Directly persist subsequent backing-map chunks. + builder.build().writeDelimitedTo(fos); + } + if (blockCount < backingMapSize) { + builder = BucketCacheProtos.BackingMap.newBuilder(); + } + } } - return builder.build(); } - private static BucketCacheProtos.BlockCacheKey toPB(BlockCacheKey key) { + static BucketCacheProtos.BlockCacheKey toPB(BlockCacheKey key) { return BucketCacheProtos.BlockCacheKey.newBuilder().setHfilename(key.getHfileName()) .setOffset(key.getOffset()).setPrimaryReplicaBlock(key.isPrimary()) .setBlockType(toPB(key.getBlockType())).build(); @@ -103,7 +121,7 @@ private static BucketCacheProtos.BlockType toPB(BlockType blockType) { } } - private static BucketCacheProtos.BucketEntry toPB(BucketEntry entry) { + static BucketCacheProtos.BucketEntry toPB(BucketEntry entry) { return BucketCacheProtos.BucketEntry.newBuilder().setOffset(entry.offset()) .setCachedTime(entry.getCachedTime()).setLength(entry.getLength()) .setDiskSizeWithHeader(entry.getOnDiskSizeWithHeader()) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestVerifyBucketCacheFile.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestVerifyBucketCacheFile.java index 0d33fb079bcd..c4608d9c2516 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestVerifyBucketCacheFile.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestVerifyBucketCacheFile.java @@ -19,6 +19,7 @@ import static org.apache.hadoop.hbase.io.hfile.CacheConfig.BUCKETCACHE_PERSIST_INTERVAL_KEY; import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.DEFAULT_ERROR_TOLERATION_DURATION; +import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.BACKING_MAP_PERSISTENCE_CHUNK_SIZE; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNull; @@ -350,6 +351,52 @@ public void testBucketCacheRecovery() throws Exception { TEST_UTIL.cleanupTestDir(); } + @Test + public void testSingleChunk() throws Exception { + testChunkedBackingMapRecovery(5, 5); + } + + @Test + public void testMultipleChunks() throws Exception { + testChunkedBackingMapRecovery(5, 10); + } + + private void testChunkedBackingMapRecovery(int chunkSize, int numBlocks) throws Exception { + HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); + Path testDir = TEST_UTIL.getDataTestDir(); + TEST_UTIL.getTestFileSystem().mkdirs(testDir); + Configuration conf = HBaseConfiguration.create(); + conf.setLong(BACKING_MAP_PERSISTENCE_CHUNK_SIZE, chunkSize); + + String mapFileName = testDir + "/bucket.persistence" + EnvironmentEdgeManager.currentTime(); + BucketCache bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, + constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, mapFileName, + DEFAULT_ERROR_TOLERATION_DURATION, conf); + + CacheTestUtils.HFileBlockPair[] blocks = + CacheTestUtils.generateHFileBlocks(constructedBlockSize, numBlocks); + + for (int i = 0; i < numBlocks; i++) { + cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[i].getBlockName(), blocks[i].getBlock()); + } + + // saves the current state + bucketCache.persistToFile(); + + // Create a new bucket which reads from persistence file. + BucketCache newBucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, + constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, mapFileName, + DEFAULT_ERROR_TOLERATION_DURATION, conf); + + assertEquals(numBlocks, newBucketCache.backingMap.size()); + + for (int i = 0; i < numBlocks; i++) { + assertEquals(blocks[i].getBlock(), + newBucketCache.getBlock(blocks[i].getBlockName(), false, false, false)); + } + TEST_UTIL.cleanupTestDir(); + } + private void waitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey) throws InterruptedException { while (!cache.backingMap.containsKey(cacheKey) || cache.ramCache.containsKey(cacheKey)) { From e6760420311cf9821f8be0459e61f5c295069542 Mon Sep 17 00:00:00 2001 From: Janardhan Hungund Date: Thu, 29 Aug 2024 20:41:27 +0530 Subject: [PATCH 2/3] HBASE-28805: Address review comments. Change-Id: I3eaddcef94a711e63eeefd47063f074d0a57c984 --- .../hadoop/hbase/io/hfile/bucket/BucketCache.java | 9 ++------- .../hbase/io/hfile/bucket/BucketProtoUtils.java | 15 ++++++++++++--- 2 files changed, 14 insertions(+), 10 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java index 6d6002e06aae..a186dbfa2553 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java @@ -150,8 +150,6 @@ public class BucketCache implements BlockCache, HeapSize { final static long DEFAULT_BACKING_MAP_PERSISTENCE_CHUNK_SIZE = 10000000; - final static byte[] PB_MAGIC_V2 = new byte[] { 'V', '2', 'U', 'F' }; - // Store/read block data transient final IOEngine ioEngine; @@ -1423,7 +1421,7 @@ private void retrieveFromFile(int[] bucketSizes) throws IOException { LOG.info("Reading old format of persistence."); // The old non-chunked version of backing map persistence. parsePB(BucketCacheProtos.BucketCacheEntry.parseDelimitedFrom(in)); - } else if (Arrays.equals(pbuf, PB_MAGIC_V2)) { + } else if (Arrays.equals(pbuf, BucketProtoUtils.PB_MAGIC_V2)) { // The new persistence format of chunked persistence. LOG.info("Reading new chunked format of persistence."); retrieveChunkedBackingMap(in, bucketSizes); @@ -1561,7 +1559,6 @@ private void parsePB(BucketCacheProtos.BucketCacheEntry proto) throws IOExceptio } private void persistChunkedBackingMap(FileOutputStream fos) throws IOException { - fos.write(PB_MAGIC_V2); long numChunks = backingMap.size() / persistenceChunkSize; if (backingMap.size() % persistenceChunkSize != 0) { numChunks += 1; @@ -1571,9 +1568,7 @@ private void persistChunkedBackingMap(FileOutputStream fos) throws IOException { + "fullycachedFiles size: {}, chunkSize: {}, numberofChunks: {}", backingMap.size(), fullyCachedFiles.size(), persistenceChunkSize, numChunks); - fos.write(Bytes.toBytes(persistenceChunkSize)); - fos.write(Bytes.toBytes(numChunks)); - BucketProtoUtils.toPB(this, fos, persistenceChunkSize); + BucketProtoUtils.serializeAsPB(this, fos, persistenceChunkSize, numChunks); LOG.debug("persistToFile: after persisting backing map size: {}, " + "fullycachedFiles size: {}, numChunksPersisteed: {}", diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketProtoUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketProtoUtils.java index 3b39afa9355c..f0576908e942 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketProtoUtils.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketProtoUtils.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.io.hfile.BlockType; import org.apache.hadoop.hbase.io.hfile.CacheableDeserializerIdManager; import org.apache.hadoop.hbase.io.hfile.HFileBlock; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.yetus.audience.InterfaceAudience; @@ -42,6 +43,8 @@ @InterfaceAudience.Private final class BucketProtoUtils { + + final static byte[] PB_MAGIC_V2 = new byte[] { 'V', '2', 'U', 'F' }; private BucketProtoUtils() { } @@ -58,11 +61,17 @@ static BucketCacheProtos.BucketCacheEntry toPB(BucketCache cache, BucketCachePro .build(); } - static void toPB(BucketCache cache, FileOutputStream fos, long chunkSize) throws IOException{ + public static void serializeAsPB(BucketCache cache, FileOutputStream fos, + long chunkSize, long numChunks) throws IOException{ int blockCount = 0; int chunkCount = 0; int backingMapSize = cache.backingMap.size(); BucketCacheProtos.BackingMap.Builder builder = BucketCacheProtos.BackingMap.newBuilder(); + + fos.write(PB_MAGIC_V2); + fos.write(Bytes.toBytes(chunkSize)); + fos.write(Bytes.toBytes(numChunks)); + for (Map.Entry entry : cache.backingMap.entrySet()) { blockCount++; builder.addEntry(BucketCacheProtos.BackingMapEntry.newBuilder() @@ -84,7 +93,7 @@ static void toPB(BucketCache cache, FileOutputStream fos, long chunkSize) throws } } - static BucketCacheProtos.BlockCacheKey toPB(BlockCacheKey key) { + private static BucketCacheProtos.BlockCacheKey toPB(BlockCacheKey key) { return BucketCacheProtos.BlockCacheKey.newBuilder().setHfilename(key.getHfileName()) .setOffset(key.getOffset()).setPrimaryReplicaBlock(key.isPrimary()) .setBlockType(toPB(key.getBlockType())).build(); @@ -121,7 +130,7 @@ private static BucketCacheProtos.BlockType toPB(BlockType blockType) { } } - static BucketCacheProtos.BucketEntry toPB(BucketEntry entry) { + private static BucketCacheProtos.BucketEntry toPB(BucketEntry entry) { return BucketCacheProtos.BucketEntry.newBuilder().setOffset(entry.offset()) .setCachedTime(entry.getCachedTime()).setLength(entry.getLength()) .setDiskSizeWithHeader(entry.getOnDiskSizeWithHeader()) From f853dbc710c878da2d53981cec9e59b87ececf2f Mon Sep 17 00:00:00 2001 From: Janardhan Hungund Date: Thu, 29 Aug 2024 21:12:40 +0530 Subject: [PATCH 3/3] HBASE-28805: Fix spotless errors and spot bugs. Change-Id: I082a83ef841125eed00d3ccd660bd2beb95dc4f9 --- .../hbase/io/hfile/bucket/BucketCache.java | 43 +++++++++++-------- .../io/hfile/bucket/BucketProtoUtils.java | 19 ++++---- .../bucket/TestVerifyBucketCacheFile.java | 2 +- 3 files changed, 37 insertions(+), 27 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java index a186dbfa2553..10d0c925a47a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java @@ -78,7 +78,6 @@ import org.apache.hadoop.hbase.nio.RefCnt; import org.apache.hadoop.hbase.protobuf.ProtobufMagic; import org.apache.hadoop.hbase.regionserver.StoreFileInfo; -import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.IdReadWriteLock; @@ -124,7 +123,8 @@ public class BucketCache implements BlockCache, HeapSize { static final String EXTRA_FREE_FACTOR_CONFIG_NAME = "hbase.bucketcache.extrafreefactor"; static final String ACCEPT_FACTOR_CONFIG_NAME = "hbase.bucketcache.acceptfactor"; static final String MIN_FACTOR_CONFIG_NAME = "hbase.bucketcache.minfactor"; - static final String BACKING_MAP_PERSISTENCE_CHUNK_SIZE = "hbase.bucketcache.persistence.chunksize"; + static final String BACKING_MAP_PERSISTENCE_CHUNK_SIZE = + "hbase.bucketcache.persistence.chunksize"; /** Use strong reference for offsetLock or not */ private static final String STRONG_REF_KEY = "hbase.bucketcache.offsetlock.usestrongref"; @@ -320,8 +320,8 @@ public BucketCache(String ioEngineName, long capacity, int blockSize, int[] buck this.queueAdditionWaitTime = conf.getLong(QUEUE_ADDITION_WAIT_TIME, DEFAULT_QUEUE_ADDITION_WAIT_TIME); this.bucketcachePersistInterval = conf.getLong(BUCKETCACHE_PERSIST_INTERVAL_KEY, 1000); - this.persistenceChunkSize = conf.getLong(BACKING_MAP_PERSISTENCE_CHUNK_SIZE, - DEFAULT_BACKING_MAP_PERSISTENCE_CHUNK_SIZE); + this.persistenceChunkSize = + conf.getLong(BACKING_MAP_PERSISTENCE_CHUNK_SIZE, DEFAULT_BACKING_MAP_PERSISTENCE_CHUNK_SIZE); if (this.persistenceChunkSize <= 0) { persistenceChunkSize = DEFAULT_BACKING_MAP_PERSISTENCE_CHUNK_SIZE; } @@ -1528,20 +1528,20 @@ private void parsePB(BucketCacheProtos.BucketCacheEntry firstChunk, blocksByHFile.addAll(pair.getSecond()); fullyCachedFiles.putAll(BucketProtoUtils.fromPB(firstChunk.getCachedFilesMap())); - LOG.debug("Number of blocks after first chunk: {}, blocksByHFile: {}", - backingMap.size(), fullyCachedFiles.size()); + LOG.debug("Number of blocks after first chunk: {}, blocksByHFile: {}", backingMap.size(), + fullyCachedFiles.size()); int i = 1; for (BucketCacheProtos.BackingMap chunk : chunks) { Pair, NavigableSet> pair2 = - BucketProtoUtils.fromPB(firstChunk.getDeserializersMap(), chunk, - this::createRecycler); + BucketProtoUtils.fromPB(firstChunk.getDeserializersMap(), chunk, this::createRecycler); backingMap.putAll(pair2.getFirst()); blocksByHFile.addAll(pair2.getSecond()); - LOG.debug("Number of blocks after {} reading chunk: {}, blocksByHFile: {}", - ++i, backingMap.size(), fullyCachedFiles.size()); + LOG.debug("Number of blocks after {} reading chunk: {}, blocksByHFile: {}", ++i, + backingMap.size(), fullyCachedFiles.size()); } verifyFileIntegrity(firstChunk); - verifyCapacityAndClasses(firstChunk.getCacheCapacity(), firstChunk.getIoClass(), firstChunk.getMapClass()); + verifyCapacityAndClasses(firstChunk.getCacheCapacity(), firstChunk.getIoClass(), + firstChunk.getMapClass()); updateRegionSizeMapWhileRetrievingFromFile(); } @@ -1564,22 +1564,31 @@ private void persistChunkedBackingMap(FileOutputStream fos) throws IOException { numChunks += 1; } - LOG.debug("persistToFile: before persisting backing map size: {}, " + LOG.debug( + "persistToFile: before persisting backing map size: {}, " + "fullycachedFiles size: {}, chunkSize: {}, numberofChunks: {}", backingMap.size(), fullyCachedFiles.size(), persistenceChunkSize, numChunks); BucketProtoUtils.serializeAsPB(this, fos, persistenceChunkSize, numChunks); - LOG.debug("persistToFile: after persisting backing map size: {}, " + LOG.debug( + "persistToFile: after persisting backing map size: {}, " + "fullycachedFiles size: {}, numChunksPersisteed: {}", backingMap.size(), fullyCachedFiles.size(), numChunks); } private void retrieveChunkedBackingMap(FileInputStream in, int[] bucketSizes) throws IOException { byte[] bytes = new byte[Long.BYTES]; - in.read(bytes); + int readSize = in.read(bytes); + if (readSize != Long.BYTES) { + throw new IOException("Invalid size of chunk-size read from persistence: " + readSize); + } long batchSize = Bytes.toLong(bytes, 0); - in.read(bytes); + + readSize = in.read(bytes); + if (readSize != Long.BYTES) { + throw new IOException("Invalid size for number of chunks read from persistence: " + readSize); + } long numChunks = Bytes.toLong(bytes, 0); LOG.info("Number of chunks: {}, chunk size: {}", numChunks, batchSize); @@ -1591,9 +1600,9 @@ private void retrieveChunkedBackingMap(FileInputStream in, int[] bucketSizes) th // Subsequent chunks have the backingMap entries. for (int i = 1; i < numChunks; i++) { - LOG.info("Reading chunk no: {}", i+1); + LOG.info("Reading chunk no: {}", i + 1); bucketCacheMaps.add(BucketCacheProtos.BackingMap.parseDelimitedFrom(in)); - LOG.info("Retrieved chunk: {}", i+1); + LOG.info("Retrieved chunk: {}", i + 1); } parsePB(firstChunk, bucketCacheMaps); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketProtoUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketProtoUtils.java index f0576908e942..4618200325c1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketProtoUtils.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketProtoUtils.java @@ -44,25 +44,26 @@ @InterfaceAudience.Private final class BucketProtoUtils { - final static byte[] PB_MAGIC_V2 = new byte[] { 'V', '2', 'U', 'F' }; + final static byte[] PB_MAGIC_V2 = new byte[] { 'V', '2', 'U', 'F' }; + private BucketProtoUtils() { } - static BucketCacheProtos.BucketCacheEntry toPB(BucketCache cache, BucketCacheProtos.BackingMap backingMap) { + static BucketCacheProtos.BucketCacheEntry toPB(BucketCache cache, + BucketCacheProtos.BackingMap backingMap) { return BucketCacheProtos.BucketCacheEntry.newBuilder().setCacheCapacity(cache.getMaxSize()) .setIoClass(cache.ioEngine.getClass().getName()) .setMapClass(cache.backingMap.getClass().getName()) .putAllDeserializers(CacheableDeserializerIdManager.save()) - .putAllCachedFiles(toCachedPB(cache.fullyCachedFiles)) - .setBackingMap(backingMap) + .putAllCachedFiles(toCachedPB(cache.fullyCachedFiles)).setBackingMap(backingMap) .setChecksum(ByteString .copyFrom(((PersistentIOEngine) cache.ioEngine).calculateChecksum(cache.getAlgorithm()))) .build(); } - public static void serializeAsPB(BucketCache cache, FileOutputStream fos, - long chunkSize, long numChunks) throws IOException{ + public static void serializeAsPB(BucketCache cache, FileOutputStream fos, long chunkSize, + long numChunks) throws IOException { int blockCount = 0; int chunkCount = 0; int backingMapSize = cache.backingMap.size(); @@ -74,9 +75,9 @@ public static void serializeAsPB(BucketCache cache, FileOutputStream fos, for (Map.Entry entry : cache.backingMap.entrySet()) { blockCount++; - builder.addEntry(BucketCacheProtos.BackingMapEntry.newBuilder() - .setKey(BucketProtoUtils.toPB(entry.getKey())) - .setValue(BucketProtoUtils.toPB(entry.getValue())).build()); + builder.addEntry( + BucketCacheProtos.BackingMapEntry.newBuilder().setKey(BucketProtoUtils.toPB(entry.getKey())) + .setValue(BucketProtoUtils.toPB(entry.getValue())).build()); if (blockCount % chunkSize == 0 || (blockCount == backingMapSize)) { chunkCount++; if (chunkCount == 1) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestVerifyBucketCacheFile.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestVerifyBucketCacheFile.java index c4608d9c2516..b49a2b1db8d2 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestVerifyBucketCacheFile.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestVerifyBucketCacheFile.java @@ -18,8 +18,8 @@ package org.apache.hadoop.hbase.io.hfile.bucket; import static org.apache.hadoop.hbase.io.hfile.CacheConfig.BUCKETCACHE_PERSIST_INTERVAL_KEY; -import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.DEFAULT_ERROR_TOLERATION_DURATION; import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.BACKING_MAP_PERSISTENCE_CHUNK_SIZE; +import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.DEFAULT_ERROR_TOLERATION_DURATION; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNull;