From 6364fb706a2d25eda885168c33c488c4e8cbf4c9 Mon Sep 17 00:00:00 2001 From: Janardhan Hungund Date: Tue, 6 Aug 2024 16:04:33 +0530 Subject: [PATCH 1/9] HBASE-28804: Implement asynchronous retrieval of bucket-cache data from persistence During the retrieval of data from bucket cache persistence file, a transient structure that stores the blocks ordered by filename is constructed from the backing map entries. The population of this transient structure is done during the server start-up. This process increases the region-server startup time, if the bucketcache has large number of blocks. This population happens inline with the server restart and blocks the server for several minutes. This makes the server restart inconvenient for the external users. Restarts during upgrade can run into timeout issues due to this delay in the server startup. Hence, the recommendation in this Jira is to make the cache-retrieval asynchronous to the server startup. During a server startup, a new thread is spawn that reads the persistence file and creates the required structures from persistence file. The server continues with the restart and does not wait for the bucket-cache initialisation to complete. Note that the bucket cache is not available immediately for usage and will only be ready to use after the data is repopulated from persistence into memory. The prefetch thread that may start before the bucket-cache is initialized is modified to wait until the bucket cache is initialized. Change-Id: I2c136d7b1d884f74642d29923172a1ad4ada36e4 --- .../balancer/CacheAwareLoadBalancer.java | 12 +- .../hadoop/hbase/io/hfile/BlockCache.java | 16 ++ .../hbase/io/hfile/CombinedBlockCache.java | 12 ++ .../hbase/io/hfile/HFilePreadReader.java | 6 +- .../hbase/io/hfile/bucket/BucketCache.java | 165 ++++++++++++++---- .../io/hfile/bucket/BucketCachePersister.java | 2 + .../hadoop/hbase/regionserver/HRegion.java | 2 +- .../hadoop/hbase/io/hfile/CacheTestUtils.java | 4 + .../TestBlockEvictionOnRegionMovement.java | 1 + .../hadoop/hbase/io/hfile/TestPrefetch.java | 24 ++- .../io/hfile/bucket/TestBucketCache.java | 31 +++- .../hfile/bucket/TestBucketWriterThread.java | 2 + .../hfile/bucket/TestPrefetchPersistence.java | 3 + .../TestRecoveryPersistentBucketCache.java | 19 +- .../bucket/TestVerifyBucketCacheFile.java | 15 +- 15 files changed, 256 insertions(+), 58 deletions(-) diff --git a/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/CacheAwareLoadBalancer.java b/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/CacheAwareLoadBalancer.java index d73769a3971b..bc31a317aebd 100644 --- a/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/CacheAwareLoadBalancer.java +++ b/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/CacheAwareLoadBalancer.java @@ -28,6 +28,7 @@ import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_PERSISTENT_PATH_KEY; +import java.text.DecimalFormat; import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Arrays; @@ -221,6 +222,9 @@ private boolean moveRegionToOldServer(BalancerClusterState cluster, int regionIn return false; } + DecimalFormat df = new DecimalFormat("#"); + df.setMaximumFractionDigits(4); + float cacheRatioDiffThreshold = 0.6f; // Conditions for moving the region @@ -240,7 +244,7 @@ private boolean moveRegionToOldServer(BalancerClusterState cluster, int regionIn LOG.debug( "Region {} moved from {} to {} as the region is cached {} equally on both servers", cluster.regions[regionIndex].getEncodedName(), cluster.servers[currentServerIndex], - cluster.servers[oldServerIndex], cacheRatioOnCurrentServer); + cluster.servers[oldServerIndex], df.format(cacheRatioOnCurrentServer)); } return true; } @@ -257,7 +261,8 @@ private boolean moveRegionToOldServer(BalancerClusterState cluster, int regionIn "Region {} moved from {} to {} as region cache ratio {} is better than the current " + "cache ratio {}", cluster.regions[regionIndex].getEncodedName(), cluster.servers[currentServerIndex], - cluster.servers[oldServerIndex], cacheRatioOnCurrentServer, cacheRatioOnOldServer); + cluster.servers[oldServerIndex], cacheRatioOnCurrentServer, + df.format(cacheRatioOnCurrentServer)); } return true; } @@ -266,7 +271,8 @@ private boolean moveRegionToOldServer(BalancerClusterState cluster, int regionIn LOG.debug( "Region {} not moved from {} to {} with current cache ratio {} and old cache ratio {}", cluster.regions[regionIndex], cluster.servers[currentServerIndex], - cluster.servers[oldServerIndex], cacheRatioOnCurrentServer, cacheRatioOnOldServer); + cluster.servers[oldServerIndex], cacheRatioOnCurrentServer, + df.format(cacheRatioOnCurrentServer)); } return false; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java index 028a80075b5a..d14bd18f289c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java @@ -261,4 +261,20 @@ default Optional> getRegionCachedInfo() { default int evictBlocksRangeByHfileName(String hfileName, long initOffset, long endOffset) { return 0; } + + /** + * API to check whether or not, the cache is enabled. + * @return returns true if the cache is enabled, false otherwise. + */ + default boolean isCacheEnabled() { + return true; + } + + /** + * Wait for the bucket cache to be enabled while server restart + * @param cache cache object + * @param timeout time to wait for the bucket cache to be enable + * @return boolean trye if the bucket cache is enabled, false otherwise + */ + default boolean waitForCacheInitialization(long timeout) { return true; } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java index 06bf2a76f756..190a47fbf87a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java @@ -507,4 +507,16 @@ public int evictBlocksRangeByHfileName(String hfileName, long initOffset, long e return l1Cache.evictBlocksRangeByHfileName(hfileName, initOffset, endOffset) + l2Cache.evictBlocksRangeByHfileName(hfileName, initOffset, endOffset); } + + @Override + public boolean waitForCacheInitialization(long timeout) { + return this.l1Cache.waitForCacheInitialization(timeout) + && this.l2Cache.waitForCacheInitialization(timeout); + } + + @Override + public boolean isCacheEnabled() { + return l1Cache.isCacheEnabled() + && l2Cache.isCacheEnabled(); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java index e6b79cc55cca..06d268df3329 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java @@ -34,12 +34,14 @@ public class HFilePreadReader extends HFileReaderImpl { private static final Logger LOG = LoggerFactory.getLogger(HFileReaderImpl.class); + private static final int WAIT_TIME_FOR_CACHE_INITIALIZATION = 10 * 60 * 1000; public HFilePreadReader(ReaderContext context, HFileInfo fileInfo, CacheConfig cacheConf, Configuration conf) throws IOException { super(context, fileInfo, cacheConf, conf); final MutableBoolean shouldCache = new MutableBoolean(true); cacheConf.getBlockCache().ifPresent(cache -> { + cache.waitForCacheInitialization(WAIT_TIME_FOR_CACHE_INITIALIZATION); Optional result = cache.shouldCacheFile(path.getName()); shouldCache.setValue(result.isPresent() ? result.get().booleanValue() : true); }); @@ -110,8 +112,8 @@ public void run() { if (!cache.blockFitsIntoTheCache(block).orElse(true)) { LOG.warn( "Interrupting prefetch for file {} because block {} of size {} " - + "doesn't fit in the available cache space.", - path, cacheKey, block.getOnDiskSizeWithHeader()); + + "doesn't fit in the available cache space. isCacheEnabled: {}", + path, cacheKey, block.getOnDiskSizeWithHeader(), cache.isCacheEnabled()); interrupted = true; break; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java index 10d0c925a47a..5585e676ea6c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java @@ -175,11 +175,22 @@ public class BucketCache implements BlockCache, HeapSize { private BucketCachePersister cachePersister; + /** + * Enum to represent the state of cache + */ + private enum CacheState { + // Initializing: State when the cache is being initialised from persistence. + INITIALIZING, + // Enabled: State when cache is initialised and is ready. + ENABLED, + // Disabled: State when the cache is disabled. + DISABLED + } /** * Flag if the cache is enabled or not... We shut it off if there are IO errors for some time, so * that Bucket IO exceptions/errors don't bring down the HBase server. */ - private volatile boolean cacheEnabled; + private volatile CacheState cacheState; /** * A list of writer queues. We have a queue per {@link WriterThread} we have running. In other @@ -337,6 +348,7 @@ public BucketCache(String ioEngineName, long capacity, int blockSize, int[] buck this.persistencePath = persistencePath; this.blockSize = blockSize; this.ioErrorsTolerationDuration = ioErrorsTolerationDuration; + this.cacheState = CacheState.INITIALIZING; this.allocFailLogPrevTs = 0; @@ -348,32 +360,40 @@ public BucketCache(String ioEngineName, long capacity, int blockSize, int[] buck this.ramCache = new RAMCache(); this.backingMap = new ConcurrentHashMap<>((int) blockNumCapacity); + instantiateWriterThreads(); if (isCachePersistent()) { if (ioEngine instanceof FileIOEngine) { startBucketCachePersisterThread(); } - try { - retrieveFromFile(bucketSizes); - } catch (IOException ioex) { - LOG.error("Can't restore from file[{}] because of ", persistencePath, ioex); - backingMap.clear(); - fullyCachedFiles.clear(); - backingMapValidated.set(true); - bucketAllocator = new BucketAllocator(capacity, bucketSizes); - regionCachedSize.clear(); - } + + Runnable persistentCacheRetriever = () -> { + try { + retrieveFromFile(bucketSizes); + LOG.info("Persistent bucket cache recovery from {} is complete.", persistencePath); + } catch (IOException ioex) { + LOG.error("Can't restore from file[{}] because of ", persistencePath, ioex); + backingMap.clear(); + fullyCachedFiles.clear(); + backingMapValidated.set(true); + try { + bucketAllocator = new BucketAllocator(capacity, bucketSizes); + } catch (BucketAllocatorException ex) { + LOG.error("Exception during Bucket Allocation", ex); + } + regionCachedSize.clear(); + } finally { + this.cacheState = CacheState.ENABLED; + startWriterThreads(); + } + }; + Thread t = new Thread(persistentCacheRetriever); + t.start(); } else { bucketAllocator = new BucketAllocator(capacity, bucketSizes); + this.cacheState = CacheState.ENABLED;; + startWriterThreads(); } - final String threadName = Thread.currentThread().getName(); - this.cacheEnabled = true; - for (int i = 0; i < writerThreads.length; ++i) { - writerThreads[i] = new WriterThread(writerQueues.get(i)); - writerThreads[i].setName(threadName + "-BucketCacheWriter-" + i); - writerThreads[i].setDaemon(true); - } - startWriterThreads(); // Run the statistics thread periodically to print the cache statistics log // TODO: Add means of turning this off. Bit obnoxious running thread just to make a log @@ -383,7 +403,7 @@ public BucketCache(String ioEngineName, long capacity, int blockSize, int[] buck LOG.info("Started bucket cache; ioengine=" + ioEngineName + ", capacity=" + StringUtils.byteDesc(capacity) + ", blockSize=" + StringUtils.byteDesc(blockSize) + ", writerThreadNum=" + writerThreadNum + ", writerQLen=" + writerQLen + ", persistencePath=" - + persistencePath + ", bucketAllocator=" + this.bucketAllocator.getClass().getName()); + + persistencePath + ", bucketAllocator=" + BucketAllocator.class.getName()); } private void sanityCheckConfigs() { @@ -406,6 +426,18 @@ private void sanityCheckConfigs() { + MEMORY_FACTOR_CONFIG_NAME + " segments must add up to 1.0"); } + /** + * Called by the constructor to instantiate the writer threads. + */ + private void instantiateWriterThreads() { + final String threadName = Thread.currentThread().getName(); + for (int i = 0; i < this.writerThreads.length; ++i) { + this.writerThreads[i] = new WriterThread(this.writerQueues.get(i)); + this.writerThreads[i].setName(threadName + "-BucketCacheWriter-" + i); + this.writerThreads[i].setDaemon(true); + } + } + /** * Called by the constructor to start the writer threads. Used by tests that need to override * starting the threads. @@ -423,8 +455,9 @@ void startBucketCachePersisterThread() { cachePersister.start(); } - boolean isCacheEnabled() { - return this.cacheEnabled; + @Override + public boolean isCacheEnabled() { + return this.cacheState == CacheState.ENABLED; } @Override @@ -513,7 +546,7 @@ public void cacheBlock(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inM */ public void cacheBlockWithWait(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory, boolean wait) { - if (cacheEnabled) { + if (isCacheEnabled()) { if (backingMap.containsKey(cacheKey) || ramCache.containsKey(cacheKey)) { if (shouldReplaceExistingCacheBlock(cacheKey, cachedItem)) { BucketEntry bucketEntry = backingMap.get(cacheKey); @@ -535,7 +568,7 @@ protected boolean shouldReplaceExistingCacheBlock(BlockCacheKey cacheKey, Cachea protected void cacheBlockWithWaitInternal(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory, boolean wait) { - if (!cacheEnabled) { + if (!isCacheEnabled()) { return; } if (cacheKey.getBlockType() == null && cachedItem.getBlockType() != null) { @@ -611,7 +644,7 @@ public BucketEntry getBlockForReference(BlockCacheKey key) { @Override public Cacheable getBlock(BlockCacheKey key, boolean caching, boolean repeat, boolean updateCacheMetrics) { - if (!cacheEnabled) { + if (!isCacheEnabled()) { return null; } RAMQueueEntry re = ramCache.get(key); @@ -775,7 +808,7 @@ public boolean evictBlock(BlockCacheKey cacheKey) { */ private boolean doEvictBlock(BlockCacheKey cacheKey, BucketEntry bucketEntry, boolean evictedByEvictionProcess) { - if (!cacheEnabled) { + if (!isCacheEnabled()) { return false; } boolean existedInRamCache = removeFromRamCache(cacheKey); @@ -885,6 +918,8 @@ public void run() { } public void logStats() { + if (!isCacheInitialized("BucketCache::logStats")) return; + long totalSize = bucketAllocator.getTotalSize(); long usedSize = bucketAllocator.getUsedSize(); long freeSize = totalSize - usedSize; @@ -917,10 +952,17 @@ public long getRealCacheSize() { } public long acceptableSize() { + if (!isCacheInitialized("BucketCache::acceptableSize")) { + return 0; + } return (long) Math.floor(bucketAllocator.getTotalSize() * acceptableFactor); } long getPartitionSize(float partitionFactor) { + if (!isCacheInitialized("BucketCache::getPartitionSize")) { + return 0; + } + return (long) Math.floor(bucketAllocator.getTotalSize() * partitionFactor * minFactor); } @@ -928,6 +970,10 @@ long getPartitionSize(float partitionFactor) { * Return the count of bucketSizeinfos still need free space */ private int bucketSizesAboveThresholdCount(float minFactor) { + if (!isCacheInitialized("BucketCache::bucketSizesAboveThresholdCount")) { + return 0; + } + BucketAllocator.IndexStatistics[] stats = bucketAllocator.getIndexStatistics(); int fullCount = 0; for (int i = 0; i < stats.length; i++) { @@ -948,6 +994,10 @@ private int bucketSizesAboveThresholdCount(float minFactor) { * @param completelyFreeBucketsNeeded number of buckets to free **/ private void freeEntireBuckets(int completelyFreeBucketsNeeded) { + if (!isCacheInitialized("BucketCache::freeEntireBuckets")) { + return; + } + if (completelyFreeBucketsNeeded != 0) { // First we will build a set where the offsets are reference counted, usually // this set is small around O(Handler Count) unless something else is wrong @@ -974,6 +1024,9 @@ private void freeEntireBuckets(int completelyFreeBucketsNeeded) { * @param why Why we are being called */ void freeSpace(final String why) { + if (!isCacheInitialized("BucketCache::freeSpace")) { + return; + } // Ensure only one freeSpace progress at a time if (!freeSpaceLock.tryLock()) { return; @@ -1129,13 +1182,13 @@ void disableWriter() { public void run() { List entries = new ArrayList<>(); try { - while (cacheEnabled && writerEnabled) { + while (isCacheEnabled() && writerEnabled) { try { try { // Blocks entries = getRAMQueueEntries(inputQueue, entries); } catch (InterruptedException ie) { - if (!cacheEnabled || !writerEnabled) { + if (!isCacheEnabled() || !writerEnabled) { break; } } @@ -1147,7 +1200,7 @@ public void run() { } catch (Throwable t) { LOG.warn("Failed doing drain", t); } - LOG.info(this.getName() + " exiting, cacheEnabled=" + cacheEnabled); + LOG.info(this.getName() + " exiting, cacheEnabled=" + isCacheEnabled()); } } @@ -1238,7 +1291,7 @@ void doDrain(final List entries, ByteBuffer metaBuff) throws Inte // Index updated inside loop if success or if we can't succeed. We retry if cache is full // when we go to add an entry by going around the loop again without upping the index. int index = 0; - while (cacheEnabled && index < size) { + while (isCacheEnabled() && index < size) { RAMQueueEntry re = null; try { re = entries.get(index); @@ -1371,10 +1424,17 @@ void persistToFile() throws IOException { File tempPersistencePath = new File(persistencePath + EnvironmentEdgeManager.currentTime()); try (FileOutputStream fos = new FileOutputStream(tempPersistencePath, false)) { LOG.debug("Persist in new chunked persistence format."); + persistChunkedBackingMap(fos); + + LOG.debug("PersistToFile: after persisting backing map size: {}, fullycachedFiles size: {}, file name: {}", + backingMap.size(), fullyCachedFiles.size(), tempPersistencePath.getName()); } catch (IOException e) { LOG.error("Failed to persist bucket cache to file", e); throw e; + } catch (Throwable e) { + LOG.error("Failed during persist bucket cache to file: ", e); + throw e; } LOG.debug("Thread {} finished persisting bucket cache to file, renaming", Thread.currentThread().getName()); @@ -1407,7 +1467,7 @@ private void retrieveFromFile(int[] bucketSizes) throws IOException { backingMapValidated.set(true); return; } - assert !cacheEnabled; + assert !isCacheEnabled(); try (FileInputStream in = new FileInputStream(persistenceFile)) { int pblen = ProtobufMagic.lengthOfPBMagic(); @@ -1553,6 +1613,10 @@ private void parsePB(BucketCacheProtos.BucketCacheEntry proto) throws IOExceptio blocksByHFile = pair.getSecond(); fullyCachedFiles.clear(); fullyCachedFiles.putAll(BucketProtoUtils.fromPB(proto.getCachedFilesMap())); + + LOG.info("After retrieval Backing map size: {}, fullyCachedFiles size: {}", + backingMap.size(), fullyCachedFiles.size()); + verifyFileIntegrity(proto); updateRegionSizeMapWhileRetrievingFromFile(); verifyCapacityAndClasses(proto.getCacheCapacity(), proto.getIoClass(), proto.getMapClass()); @@ -1616,7 +1680,7 @@ private void checkIOErrorIsTolerated() { // Do a single read to a local variable to avoid timing issue - HBASE-24454 long ioErrorStartTimeTmp = this.ioErrorStartTime; if (ioErrorStartTimeTmp > 0) { - if (cacheEnabled && (now - ioErrorStartTimeTmp) > this.ioErrorsTolerationDuration) { + if (isCacheEnabled() && (now - ioErrorStartTimeTmp) > this.ioErrorsTolerationDuration) { LOG.error("IO errors duration time has exceeded " + ioErrorsTolerationDuration + "ms, disabling cache, please check your IOEngine"); disableCache(); @@ -1630,9 +1694,9 @@ private void checkIOErrorIsTolerated() { * Used to shut down the cache -or- turn it off in the case of something broken. */ private void disableCache() { - if (!cacheEnabled) return; + if (!isCacheEnabled()) return; LOG.info("Disabling cache"); - cacheEnabled = false; + cacheState = CacheState.DISABLED; ioEngine.shutdown(); this.scheduleThreadPool.shutdown(); for (int i = 0; i < writerThreads.length; ++i) @@ -1713,6 +1777,9 @@ public long getCurrentDataSize() { @Override public long getFreeSize() { + if (!isCacheInitialized("BucketCache:getFreeSize")) { + return 0; + } return this.bucketAllocator.getFreeSize(); } @@ -1728,6 +1795,9 @@ public long getDataBlockCount() { @Override public long getCurrentSize() { + if (!isCacheInitialized("BucketCache::getCurrentSize")) { + return 0; + } return this.bucketAllocator.getUsedSize(); } @@ -2234,6 +2304,10 @@ public void notifyFileCachingCompleted(Path fileName, int totalBlockCount, int d @Override public Optional blockFitsIntoTheCache(HFileBlock block) { + if (!isCacheInitialized("blockFitsIntoTheCache")) { + return Optional.of(false); + } + long currentUsed = bucketAllocator.getUsedSize(); boolean result = (currentUsed + block.getOnDiskSizeWithHeader()) < acceptableSize(); return Optional.of(result); @@ -2266,4 +2340,27 @@ public Optional getBlockSize(BlockCacheKey key) { } } + + boolean isCacheInitialized(String api) { + if (cacheState == CacheState.INITIALIZING) { + LOG.warn("Bucket initialisation pending at {}", api); + return false; + } + return true; + } + + @Override + public boolean waitForCacheInitialization(long timeout) { + try { + while (cacheState == CacheState.INITIALIZING) { + if (timeout <= 0) { + break; + } + Thread.sleep(100); + timeout -= 100; + } + } finally { + return isCacheEnabled(); + } + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCachePersister.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCachePersister.java index e4382d2561e6..2039debeef96 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCachePersister.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCachePersister.java @@ -62,6 +62,8 @@ public void run() { LOG.info("Finishing cache persister thread."); } catch (InterruptedException e) { LOG.warn("Interrupting BucketCachePersister thread.", e); + } catch (Throwable e) { + LOG.error("Failed during persisting bucket cache to file: ", e); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 0750d0b394fc..e41b63489c13 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -1644,7 +1644,7 @@ public Map> close(boolean abort, boolean ignoreStatus, // Only allow one thread to close at a time. Serialize them so dual // threads attempting to close will run up against each other. MonitoredTask status = TaskMonitor.get().createStatus( - "Closing region " + this.getRegionInfo().getEncodedName() + (abort ? " due to abort" : ""), + "Closing region " + this.getRegionInfo().getEncodedName() + (abort ? " due to abort" : " as it is being closed"), ignoreStatus, true); status.setStatus("Waiting for close lock"); try { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java index 848f33bb9c3a..fb6cd77c270a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java @@ -351,4 +351,8 @@ public static void getBlockAndAssertEquals(BlockCache cache, BlockCacheKey key, } } } + + public static boolean waitForCacheInitialization(BlockCache cache, long timeout) { + return cache.waitForCacheInitialization(timeout); + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestBlockEvictionOnRegionMovement.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestBlockEvictionOnRegionMovement.java index 7303cf53a55a..3002383700e4 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestBlockEvictionOnRegionMovement.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestBlockEvictionOnRegionMovement.java @@ -133,6 +133,7 @@ public void testBlockEvictionOnGracefulStop() throws Exception { cluster.startRegionServer(); Thread.sleep(500); + CacheTestUtils.waitForCacheInitialization(regionServingRS.getBlockCache().get(), 10000); long newUsedCacheSize = regionServingRS.getBlockCache().get().getBlockCaches()[1].getCurrentSize(); assertEquals(oldUsedCacheSize, newUsedCacheSize); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetch.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetch.java index b172202c8d4a..1a8d161da555 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetch.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetch.java @@ -108,11 +108,13 @@ public class TestPrefetch { public OpenTelemetryRule otelRule = OpenTelemetryRule.create(); @Before - public void setUp() throws IOException { + public void setUp() throws IOException, InterruptedException { conf = TEST_UTIL.getConfiguration(); conf.setBoolean(CacheConfig.PREFETCH_BLOCKS_ON_OPEN_KEY, true); fs = HFileSystem.get(conf); blockCache = BlockCacheFactory.createBlockCache(conf); + // Add some sleep to enable the cache to be instantiated. + Thread.sleep(2000); cacheConf = new CacheConfig(conf, blockCache); } @@ -364,16 +366,20 @@ public void testPrefetchWithDelay() throws Exception { // Wait for 20 seconds, no thread should start prefetch Thread.sleep(20000); assertFalse("Prefetch threads should not be running at this point", reader.prefetchStarted()); - while (!reader.prefetchStarted()) { - assertTrue("Prefetch delay has not been expired yet", - getElapsedTime(startTime) < PrefetchExecutor.getPrefetchDelay()); - } - if (reader.prefetchStarted()) { - // Added some delay as we have started the timer a bit late. + long timeout = 10000; + while (!reader.prefetchStarted() && !reader.prefetchComplete()) { + // Wait until the prefetch is triggered. Thread.sleep(500); - assertTrue("Prefetch should start post configured delay", - getElapsedTime(startTime) > PrefetchExecutor.getPrefetchDelay()); + if (timeout <= 0) break; + timeout -= 500; } + assertTrue(reader.prefetchStarted() || reader.prefetchComplete()); + + // Added some delay as we have started the timer a bit late. + Thread.sleep(500); + assertTrue("Prefetch should start post configured delay", + getElapsedTime(startTime) > PrefetchExecutor.getPrefetchDelay()); + conf.setInt(PREFETCH_DELAY, 1000); conf.setFloat(PREFETCH_DELAY_VARIATION, PREFETCH_DELAY_VARIATION_DEFAULT_VALUE); prefetchExecutorNotifier.onConfigurationChange(conf); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java index 78a781994e83..7730d15c8ca1 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java @@ -226,10 +226,19 @@ public void testHeapSizeChanges() throws Exception { public static void waitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey) throws InterruptedException { - while (!cache.backingMap.containsKey(cacheKey) || cache.ramCache.containsKey(cacheKey)) { - Thread.sleep(100); + long timeout = 120000; + try { + while (!cache.backingMap.containsKey(cacheKey) || cache.ramCache.containsKey(cacheKey)) { + Thread.sleep(100); + if (timeout <= 0) { + break; + } + timeout -= 100; + } + } finally { + Thread.sleep(1000); } - Thread.sleep(1000); + assertTrue (cache.backingMap.containsKey(cacheKey) && !cache.ramCache.containsKey(cacheKey)); } public static void waitUntilAllFlushedToBucket(BucketCache cache) throws InterruptedException { @@ -303,6 +312,7 @@ public void testRetrieveFromFile() throws Exception { try { bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, smallBucketSizes, writeThreads, writerQLen, persistencePath); + bucketCache.waitForCacheInitialization(10000); assertFalse(new File(persistencePath).exists()); assertEquals(0, bucketCache.getAllocator().getUsedSize()); assertEquals(0, bucketCache.backingMap.size()); @@ -330,6 +340,7 @@ public void testRetrieveFromPMem() throws Exception { try { bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, smallBucketSizes, writeThreads, writerQLen, persistencePath); + bucketCache.waitForCacheInitialization(10000); assertFalse(new File(persistencePath).exists()); assertEquals(0, bucketCache.getAllocator().getUsedSize()); assertEquals(0, bucketCache.backingMap.size()); @@ -347,6 +358,7 @@ private void testRetrievalUtils(Path testDir, String ioEngineName) try { bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, persistencePath); + bucketCache.waitForCacheInitialization(10000); long usedSize = bucketCache.getAllocator().getUsedSize(); assertEquals(0, usedSize); HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1); @@ -363,6 +375,8 @@ private void testRetrievalUtils(Path testDir, String ioEngineName) assertTrue(new File(persistencePath).exists()); bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, persistencePath); + bucketCache.waitForCacheInitialization(10000); + assertEquals(usedSize, bucketCache.getAllocator().getUsedSize()); } finally { if (bucketCache != null) { @@ -400,6 +414,7 @@ public void testRetrieveFromMultipleFiles() throws Exception { try { bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, smallBucketSizes, writeThreads, writerQLen, persistencePath); + bucketCache.waitForCacheInitialization(10000); assertFalse(new File(persistencePath).exists()); assertEquals(0, bucketCache.getAllocator().getUsedSize()); assertEquals(0, bucketCache.backingMap.size()); @@ -413,6 +428,7 @@ public void testRetrieveFromMultipleFiles() throws Exception { public void testRetrieveFromFileWithoutPersistence() throws Exception { BucketCache bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, null); + bucketCache.waitForCacheInitialization(10000); try { final Path testDir = createAndGetTestDir(); String ioEngineName = "file:" + testDir + "/bucket.cache"; @@ -431,6 +447,7 @@ public void testRetrieveFromFileWithoutPersistence() throws Exception { bucketCache.shutdown(); bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, null); + bucketCache.waitForCacheInitialization(10000); assertEquals(0, bucketCache.getAllocator().getUsedSize()); } finally { bucketCache.shutdown(); @@ -460,6 +477,7 @@ public void testGetPartitionSize() throws IOException { BucketCache cache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, null, 100, conf); + CacheTestUtils.waitForCacheInitialization(cache, 10000); validateGetPartitionSize(cache, 0.1f, 0.5f); validateGetPartitionSize(cache, 0.7f, 0.5f); @@ -497,6 +515,7 @@ public void testValidBucketCacheConfigs() throws IOException { BucketCache cache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, null, 100, conf); + CacheTestUtils.waitForCacheInitialization(cache, 10000); assertEquals(BucketCache.ACCEPT_FACTOR_CONFIG_NAME + " failed to propagate.", 0.9f, cache.getAcceptableFactor(), 0); @@ -570,6 +589,7 @@ private void checkConfigValues(Configuration conf, Map configMa } BucketCache cache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, null, 100, conf); + CacheTestUtils.waitForCacheInitialization(cache, 10000); assertTrue("Created BucketCache and expected it to succeed: " + expectSuccess[i] + ", but it actually was: " + !expectSuccess[i], expectSuccess[i]); } catch (IllegalArgumentException e) { @@ -796,6 +816,7 @@ public void testFreeBucketEntryRestoredFromFile() throws Exception { bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, persistencePath); + CacheTestUtils.waitForCacheInitialization(bucketCache, 10000); long usedByteSize = bucketCache.getAllocator().getUsedSize(); assertEquals(0, usedByteSize); @@ -819,6 +840,7 @@ public void testFreeBucketEntryRestoredFromFile() throws Exception { // restore cache from file bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, persistencePath); + CacheTestUtils.waitForCacheInitialization(bucketCache, 10000); assertEquals(usedByteSize, bucketCache.getAllocator().getUsedSize()); for (HFileBlockPair hfileBlockPair : hfileBlockPairs) { @@ -844,6 +866,7 @@ public void testBlockAdditionWaitWhenCache() throws Exception { bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, constructedBlockSizes, 1, 1, persistencePath); + CacheTestUtils.waitForCacheInitialization(bucketCache, 10000); long usedByteSize = bucketCache.getAllocator().getUsedSize(); assertEquals(0, usedByteSize); @@ -875,6 +898,7 @@ public void testBlockAdditionWaitWhenCache() throws Exception { // restore cache from file bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, persistencePath); + CacheTestUtils.waitForCacheInitialization(bucketCache, 10000); assertEquals(usedByteSize, bucketCache.getAllocator().getUsedSize()); for (HFileBlockPair hfileBlockPair : hfileBlockPairs) { @@ -931,6 +955,7 @@ private BucketCache testNotifyFileCachingCompleted(Path filePath, int totalBlock String ioEngineName = "file:" + dataTestDir + "/bucketNoRecycler.cache"; BucketCache bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, constructedBlockSizes, 1, 1, null); + CacheTestUtils.waitForCacheInitialization(bucketCache, 10000); long usedByteSize = bucketCache.getAllocator().getUsedSize(); assertEquals(0, usedByteSize); HFileBlockPair[] hfileBlockPairs = diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketWriterThread.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketWriterThread.java index 429fffa38f6c..6b03f466ac36 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketWriterThread.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketWriterThread.java @@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; +import org.apache.hadoop.hbase.io.hfile.CacheTestUtils; import org.apache.hadoop.hbase.io.hfile.Cacheable; import org.apache.hadoop.hbase.io.hfile.HFileBlock; import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.RAMQueueEntry; @@ -84,6 +85,7 @@ public void setUp() throws Exception { final int writerThreadsCount = 1; this.bc = new MockBucketCache("offheap", capacity, 1, new int[] { 1 }, writerThreadsCount, capacity, null, 100/* Tolerate ioerrors for 100ms */); + CacheTestUtils.waitForCacheInitialization(this.bc, 10000); assertEquals(writerThreadsCount, bc.writerThreads.length); assertEquals(writerThreadsCount, bc.writerQueues.size()); // Get reference to our single WriterThread instance. diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestPrefetchPersistence.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestPrefetchPersistence.java index 4da2d5af9232..c1cd8d96e4ff 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestPrefetchPersistence.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestPrefetchPersistence.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.fs.HFileSystem; import org.apache.hadoop.hbase.io.hfile.CacheConfig; +import org.apache.hadoop.hbase.io.hfile.CacheTestUtils; import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.io.hfile.HFileContext; import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; @@ -109,6 +110,7 @@ public void testPrefetchPersistence() throws Exception { bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence", 60 * 1000, conf); + CacheTestUtils.waitForCacheInitialization(bucketCache, 10000); cacheConf = new CacheConfig(conf, bucketCache); long usedSize = bucketCache.getAllocator().getUsedSize(); @@ -127,6 +129,7 @@ public void testPrefetchPersistence() throws Exception { bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence", 60 * 1000, conf); + CacheTestUtils.waitForCacheInitialization(bucketCache, 10000); cacheConf = new CacheConfig(conf, bucketCache); assertTrue(usedSize != 0); assertTrue(bucketCache.fullyCachedFiles.containsKey(storeFile.getName())); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestRecoveryPersistentBucketCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestRecoveryPersistentBucketCache.java index 63ff334826d6..e1fd70254583 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestRecoveryPersistentBucketCache.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestRecoveryPersistentBucketCache.java @@ -21,6 +21,7 @@ import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.DEFAULT_ERROR_TOLERATION_DURATION; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; @@ -60,6 +61,7 @@ public void testBucketCacheRecovery() throws Exception { BucketCache bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, 8192, bucketSizes, writeThreads, writerQLen, testDir + "/bucket.persistence", DEFAULT_ERROR_TOLERATION_DURATION, conf); + assertTrue(CacheTestUtils.waitForCacheInitialization(bucketCache, 1000)); CacheTestUtils.HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(8192, 4); @@ -95,7 +97,8 @@ public void testBucketCacheRecovery() throws Exception { BucketCache newBucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, 8192, bucketSizes, writeThreads, writerQLen, testDir + "/bucket.persistence", DEFAULT_ERROR_TOLERATION_DURATION, conf); - Thread.sleep(100); + assertTrue(CacheTestUtils.waitForCacheInitialization(newBucketCache, 10000)); + assertEquals(3, newBucketCache.backingMap.size()); assertNull(newBucketCache.getBlock(blocks[3].getBlockName(), false, false, false)); assertNull(newBucketCache.getBlock(smallerBlocks[0].getBlockName(), false, false, false)); @@ -120,6 +123,7 @@ public void testBucketCacheEvictByHFileAfterRecovery() throws Exception { BucketCache bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, 8192, bucketSizes, writeThreads, writerQLen, testDir + "/bucket.persistence", DEFAULT_ERROR_TOLERATION_DURATION, conf); + assertTrue(CacheTestUtils.waitForCacheInitialization(bucketCache, 10000)); CacheTestUtils.HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(8192, 4); @@ -134,7 +138,7 @@ public void testBucketCacheEvictByHFileAfterRecovery() throws Exception { BucketCache newBucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, 8192, bucketSizes, writeThreads, writerQLen, testDir + "/bucket.persistence", DEFAULT_ERROR_TOLERATION_DURATION, conf); - Thread.sleep(100); + assertTrue(CacheTestUtils.waitForCacheInitialization(newBucketCache, 10000)); assertEquals(4, newBucketCache.backingMap.size()); newBucketCache.evictBlocksByHfileName(blocks[0].getBlockName().getHfileName()); assertEquals(3, newBucketCache.backingMap.size()); @@ -143,7 +147,16 @@ public void testBucketCacheEvictByHFileAfterRecovery() throws Exception { private void waitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey) throws InterruptedException { - while (!cache.backingMap.containsKey(cacheKey) || cache.ramCache.containsKey(cacheKey)) { + try { + long timeout = 120000; + while (!cache.backingMap.containsKey(cacheKey) || cache.ramCache.containsKey(cacheKey)) { + Thread.sleep(100); + if (timeout <= 0) { + break; + } + timeout -= 100; + } + } finally { Thread.sleep(100); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestVerifyBucketCacheFile.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestVerifyBucketCacheFile.java index b49a2b1db8d2..05951f77dd86 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestVerifyBucketCacheFile.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestVerifyBucketCacheFile.java @@ -100,6 +100,7 @@ public void testRetrieveFromFile() throws Exception { BucketCache bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence"); + assertTrue(CacheTestUtils.waitForCacheInitialization(bucketCache, 10000)); long usedSize = bucketCache.getAllocator().getUsedSize(); assertEquals(0, usedSize); CacheTestUtils.HFileBlockPair[] blocks = @@ -116,6 +117,7 @@ public void testRetrieveFromFile() throws Exception { bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence"); + assertTrue(CacheTestUtils.waitForCacheInitialization(bucketCache, 10000)); assertEquals(usedSize, bucketCache.getAllocator().getUsedSize()); // persist cache to file bucketCache.shutdown(); @@ -128,7 +130,7 @@ public void testRetrieveFromFile() throws Exception { bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence"); - Thread.sleep(100); + assertTrue(CacheTestUtils.waitForCacheInitialization(bucketCache, 10000)); assertEquals(0, bucketCache.getAllocator().getUsedSize()); assertEquals(0, bucketCache.backingMap.size()); // Add blocks @@ -148,7 +150,7 @@ public void testRetrieveFromFile() throws Exception { bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence"); - Thread.sleep(100); + assertTrue(CacheTestUtils.waitForCacheInitialization(bucketCache, 10000)); assertEquals(0, bucketCache.getAllocator().getUsedSize()); assertEquals(0, bucketCache.backingMap.size()); @@ -166,6 +168,7 @@ public void testRetrieveFromFileAfterDelete() throws Exception { BucketCache bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, mapFileName, 60 * 1000, conf); + assertTrue(CacheTestUtils.waitForCacheInitialization(bucketCache, 10000)); long usedSize = bucketCache.getAllocator().getUsedSize(); assertEquals(0, usedSize); @@ -187,6 +190,7 @@ public void testRetrieveFromFileAfterDelete() throws Exception { bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, mapFileName, 60 * 1000, conf); + assertTrue(CacheTestUtils.waitForCacheInitialization(bucketCache, 10000)); assertEquals(0, bucketCache.getAllocator().getUsedSize()); assertEquals(0, bucketCache.backingMap.size()); } @@ -210,6 +214,7 @@ public void testModifiedBucketCacheFileData() throws Exception { BucketCache bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence", DEFAULT_ERROR_TOLERATION_DURATION, conf); + assertTrue(CacheTestUtils.waitForCacheInitialization(bucketCache, 10000)); long usedSize = bucketCache.getAllocator().getUsedSize(); assertEquals(0, usedSize); @@ -234,7 +239,7 @@ public void testModifiedBucketCacheFileData() throws Exception { bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence"); - Thread.sleep(100); + assertTrue(CacheTestUtils.waitForCacheInitialization(bucketCache, 10000)); assertEquals(0, bucketCache.getAllocator().getUsedSize()); assertEquals(0, bucketCache.backingMap.size()); @@ -264,6 +269,7 @@ public void testModifiedBucketCacheFileTime() throws Exception { BucketCache bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence"); + assertTrue(CacheTestUtils.waitForCacheInitialization(bucketCache, 10000)); long usedSize = bucketCache.getAllocator().getUsedSize(); assertEquals(0, usedSize); @@ -290,6 +296,7 @@ public void testModifiedBucketCacheFileTime() throws Exception { bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence"); + assertTrue(CacheTestUtils.waitForCacheInitialization(bucketCache, 10000)); assertEquals(usedSize, bucketCache.getAllocator().getUsedSize()); assertEquals(blockCount, bucketCache.backingMap.size()); @@ -316,6 +323,7 @@ public void testBucketCacheRecovery() throws Exception { BucketCache bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, mapFileName, DEFAULT_ERROR_TOLERATION_DURATION, conf); + assertTrue(CacheTestUtils.waitForCacheInitialization(bucketCache, 10000)); CacheTestUtils.HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(constructedBlockSize, 4); @@ -340,6 +348,7 @@ public void testBucketCacheRecovery() throws Exception { BucketCache newBucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, mapFileName, DEFAULT_ERROR_TOLERATION_DURATION, conf); + assertTrue(CacheTestUtils.waitForCacheInitialization(newBucketCache, 10000)); assertNull(newBucketCache.getBlock(blocks[0].getBlockName(), false, false, false)); assertEquals(blocks[1].getBlock(), From 6210a96d9e1f804704eba753708b7f00f038927b Mon Sep 17 00:00:00 2001 From: Janardhan Hungund Date: Thu, 29 Aug 2024 12:16:54 +0530 Subject: [PATCH 2/9] HBASE-28804: Apply splotless and spotbugs changes. Change-Id: I239c357a135a058650a20c5707c7c7303a248c85 --- .../hadoop/hbase/io/hfile/BlockCache.java | 6 +- .../hbase/io/hfile/CombinedBlockCache.java | 3 +- .../hbase/io/hfile/HFilePreadReader.java | 1 + .../hbase/io/hfile/bucket/BucketCache.java | 69 ++++++++++--------- .../hadoop/hbase/regionserver/HRegion.java | 6 +- .../io/hfile/bucket/TestBucketCache.java | 2 +- 6 files changed, 47 insertions(+), 40 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java index d14bd18f289c..3454c82bdec5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java @@ -272,9 +272,11 @@ default boolean isCacheEnabled() { /** * Wait for the bucket cache to be enabled while server restart - * @param cache cache object + * @param cache cache object * @param timeout time to wait for the bucket cache to be enable * @return boolean trye if the bucket cache is enabled, false otherwise */ - default boolean waitForCacheInitialization(long timeout) { return true; } + default boolean waitForCacheInitialization(long timeout) { + return true; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java index 190a47fbf87a..caf5c374f395 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java @@ -516,7 +516,6 @@ public boolean waitForCacheInitialization(long timeout) { @Override public boolean isCacheEnabled() { - return l1Cache.isCacheEnabled() - && l2Cache.isCacheEnabled(); + return l1Cache.isCacheEnabled() && l2Cache.isCacheEnabled(); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java index 06d268df3329..8c9d473b53be 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java @@ -35,6 +35,7 @@ public class HFilePreadReader extends HFileReaderImpl { private static final Logger LOG = LoggerFactory.getLogger(HFileReaderImpl.class); private static final int WAIT_TIME_FOR_CACHE_INITIALIZATION = 10 * 60 * 1000; + public HFilePreadReader(ReaderContext context, HFileInfo fileInfo, CacheConfig cacheConf, Configuration conf) throws IOException { super(context, fileInfo, cacheConf, conf); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java index 5585e676ea6c..861aad75f9d2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java @@ -186,6 +186,7 @@ private enum CacheState { // Disabled: State when the cache is disabled. DISABLED } + /** * Flag if the cache is enabled or not... We shut it off if there are IO errors for some time, so * that Bucket IO exceptions/errors don't bring down the HBase server. @@ -366,32 +367,11 @@ public BucketCache(String ioEngineName, long capacity, int blockSize, int[] buck if (ioEngine instanceof FileIOEngine) { startBucketCachePersisterThread(); } - - Runnable persistentCacheRetriever = () -> { - try { - retrieveFromFile(bucketSizes); - LOG.info("Persistent bucket cache recovery from {} is complete.", persistencePath); - } catch (IOException ioex) { - LOG.error("Can't restore from file[{}] because of ", persistencePath, ioex); - backingMap.clear(); - fullyCachedFiles.clear(); - backingMapValidated.set(true); - try { - bucketAllocator = new BucketAllocator(capacity, bucketSizes); - } catch (BucketAllocatorException ex) { - LOG.error("Exception during Bucket Allocation", ex); - } - regionCachedSize.clear(); - } finally { - this.cacheState = CacheState.ENABLED; - startWriterThreads(); - } - }; - Thread t = new Thread(persistentCacheRetriever); - t.start(); + startPersistenceRetriever(bucketSizes, capacity); } else { bucketAllocator = new BucketAllocator(capacity, bucketSizes); - this.cacheState = CacheState.ENABLED;; + this.cacheState = CacheState.ENABLED; + ; startWriterThreads(); } @@ -406,6 +386,31 @@ public BucketCache(String ioEngineName, long capacity, int blockSize, int[] buck + persistencePath + ", bucketAllocator=" + BucketAllocator.class.getName()); } + private void startPersistenceRetriever(int[] bucketSizes, long capacity) { + Runnable persistentCacheRetriever = () -> { + try { + retrieveFromFile(bucketSizes); + LOG.info("Persistent bucket cache recovery from {} is complete.", persistencePath); + } catch (IOException ioex) { + LOG.error("Can't restore from file[{}] because of ", persistencePath, ioex); + backingMap.clear(); + fullyCachedFiles.clear(); + backingMapValidated.set(true); + try { + bucketAllocator = new BucketAllocator(capacity, bucketSizes); + } catch (BucketAllocatorException ex) { + LOG.error("Exception during Bucket Allocation", ex); + } + regionCachedSize.clear(); + } finally { + this.cacheState = CacheState.ENABLED; + startWriterThreads(); + } + }; + Thread t = new Thread(persistentCacheRetriever); + t.start(); + } + private void sanityCheckConfigs() { Preconditions.checkArgument(acceptableFactor <= 1 && acceptableFactor >= 0, ACCEPT_FACTOR_CONFIG_NAME + " must be between 0.0 and 1.0"); @@ -429,14 +434,14 @@ private void sanityCheckConfigs() { /** * Called by the constructor to instantiate the writer threads. */ - private void instantiateWriterThreads() { - final String threadName = Thread.currentThread().getName(); - for (int i = 0; i < this.writerThreads.length; ++i) { - this.writerThreads[i] = new WriterThread(this.writerQueues.get(i)); - this.writerThreads[i].setName(threadName + "-BucketCacheWriter-" + i); - this.writerThreads[i].setDaemon(true); - } - } + private void instantiateWriterThreads() { + final String threadName = Thread.currentThread().getName(); + for (int i = 0; i < this.writerThreads.length; ++i) { + this.writerThreads[i] = new WriterThread(this.writerQueues.get(i)); + this.writerThreads[i].setName(threadName + "-BucketCacheWriter-" + i); + this.writerThreads[i].setDaemon(true); + } + } /** * Called by the constructor to start the writer threads. Used by tests that need to override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index e41b63489c13..e7ac11601337 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -1643,9 +1643,9 @@ public Map> close(boolean abort, boolean ignoreStatus, boolean isGracefulStop) throws IOException { // Only allow one thread to close at a time. Serialize them so dual // threads attempting to close will run up against each other. - MonitoredTask status = TaskMonitor.get().createStatus( - "Closing region " + this.getRegionInfo().getEncodedName() + (abort ? " due to abort" : " as it is being closed"), - ignoreStatus, true); + MonitoredTask status = + TaskMonitor.get().createStatus("Closing region " + this.getRegionInfo().getEncodedName() + + (abort ? " due to abort" : " as it is being closed"), ignoreStatus, true); status.setStatus("Waiting for close lock"); try { synchronized (closeLock) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java index 7730d15c8ca1..37bf693c5f51 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java @@ -238,7 +238,7 @@ public static void waitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cac } finally { Thread.sleep(1000); } - assertTrue (cache.backingMap.containsKey(cacheKey) && !cache.ramCache.containsKey(cacheKey)); + assertTrue(cache.backingMap.containsKey(cacheKey) && !cache.ramCache.containsKey(cacheKey)); } public static void waitUntilAllFlushedToBucket(BucketCache cache) throws InterruptedException { From a5a75b7fd15c586b069c68be9ec4dba9241c9804 Mon Sep 17 00:00:00 2001 From: Janardhan Hungund Date: Thu, 29 Aug 2024 16:59:43 +0530 Subject: [PATCH 3/9] HBASE-28804: Addressing first round of review comments. Change-Id: Ie160b249f6a2bff18fd8a577ae32a263b7de25ea --- .../hadoop/hbase/io/hfile/TestPrefetch.java | 11 ++--- .../io/hfile/bucket/TestBucketCache.java | 44 +++++++------------ .../TestRecoveryPersistentBucketCache.java | 13 ++---- 3 files changed, 23 insertions(+), 45 deletions(-) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetch.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetch.java index 1a8d161da555..7a4bad39c7fb 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetch.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetch.java @@ -55,6 +55,7 @@ import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.MatcherPredicate; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; import org.apache.hadoop.hbase.client.RegionInfo; @@ -367,16 +368,10 @@ public void testPrefetchWithDelay() throws Exception { Thread.sleep(20000); assertFalse("Prefetch threads should not be running at this point", reader.prefetchStarted()); long timeout = 10000; - while (!reader.prefetchStarted() && !reader.prefetchComplete()) { - // Wait until the prefetch is triggered. - Thread.sleep(500); - if (timeout <= 0) break; - timeout -= 500; - } + Waiter.waitFor(conf, 10000, () -> (reader.prefetchStarted() || reader.prefetchComplete())); + assertTrue(reader.prefetchStarted() || reader.prefetchComplete()); - // Added some delay as we have started the timer a bit late. - Thread.sleep(500); assertTrue("Prefetch should start post configured delay", getElapsedTime(startTime) > PrefetchExecutor.getPrefetchDelay()); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java index 37bf693c5f51..a0507234f091 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java @@ -226,19 +226,9 @@ public void testHeapSizeChanges() throws Exception { public static void waitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey) throws InterruptedException { - long timeout = 120000; - try { - while (!cache.backingMap.containsKey(cacheKey) || cache.ramCache.containsKey(cacheKey)) { - Thread.sleep(100); - if (timeout <= 0) { - break; - } - timeout -= 100; - } - } finally { - Thread.sleep(1000); + while (!cache.backingMap.containsKey(cacheKey) || cache.ramCache.containsKey(cacheKey)) { + Thread.sleep(100); } - assertTrue(cache.backingMap.containsKey(cacheKey) && !cache.ramCache.containsKey(cacheKey)); } public static void waitUntilAllFlushedToBucket(BucketCache cache) throws InterruptedException { @@ -312,7 +302,7 @@ public void testRetrieveFromFile() throws Exception { try { bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, smallBucketSizes, writeThreads, writerQLen, persistencePath); - bucketCache.waitForCacheInitialization(10000); + assertTrue(bucketCache.waitForCacheInitialization(10000)); assertFalse(new File(persistencePath).exists()); assertEquals(0, bucketCache.getAllocator().getUsedSize()); assertEquals(0, bucketCache.backingMap.size()); @@ -340,7 +330,7 @@ public void testRetrieveFromPMem() throws Exception { try { bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, smallBucketSizes, writeThreads, writerQLen, persistencePath); - bucketCache.waitForCacheInitialization(10000); + assertTrue(bucketCache.waitForCacheInitialization(10000)); assertFalse(new File(persistencePath).exists()); assertEquals(0, bucketCache.getAllocator().getUsedSize()); assertEquals(0, bucketCache.backingMap.size()); @@ -358,7 +348,7 @@ private void testRetrievalUtils(Path testDir, String ioEngineName) try { bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, persistencePath); - bucketCache.waitForCacheInitialization(10000); + assertTrue(bucketCache.waitForCacheInitialization(10000)); long usedSize = bucketCache.getAllocator().getUsedSize(); assertEquals(0, usedSize); HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1); @@ -375,7 +365,7 @@ private void testRetrievalUtils(Path testDir, String ioEngineName) assertTrue(new File(persistencePath).exists()); bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, persistencePath); - bucketCache.waitForCacheInitialization(10000); + assertTrue(bucketCache.waitForCacheInitialization(10000)); assertEquals(usedSize, bucketCache.getAllocator().getUsedSize()); } finally { @@ -414,7 +404,7 @@ public void testRetrieveFromMultipleFiles() throws Exception { try { bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, smallBucketSizes, writeThreads, writerQLen, persistencePath); - bucketCache.waitForCacheInitialization(10000); + assertTrue(bucketCache.waitForCacheInitialization(10000)); assertFalse(new File(persistencePath).exists()); assertEquals(0, bucketCache.getAllocator().getUsedSize()); assertEquals(0, bucketCache.backingMap.size()); @@ -428,7 +418,7 @@ public void testRetrieveFromMultipleFiles() throws Exception { public void testRetrieveFromFileWithoutPersistence() throws Exception { BucketCache bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, null); - bucketCache.waitForCacheInitialization(10000); + assertTrue(bucketCache.waitForCacheInitialization(10000)); try { final Path testDir = createAndGetTestDir(); String ioEngineName = "file:" + testDir + "/bucket.cache"; @@ -447,7 +437,7 @@ public void testRetrieveFromFileWithoutPersistence() throws Exception { bucketCache.shutdown(); bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, null); - bucketCache.waitForCacheInitialization(10000); + assertTrue(bucketCache.waitForCacheInitialization(10000)); assertEquals(0, bucketCache.getAllocator().getUsedSize()); } finally { bucketCache.shutdown(); @@ -477,7 +467,7 @@ public void testGetPartitionSize() throws IOException { BucketCache cache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, null, 100, conf); - CacheTestUtils.waitForCacheInitialization(cache, 10000); + assertTrue(cache.waitForCacheInitialization(10000)); validateGetPartitionSize(cache, 0.1f, 0.5f); validateGetPartitionSize(cache, 0.7f, 0.5f); @@ -515,7 +505,7 @@ public void testValidBucketCacheConfigs() throws IOException { BucketCache cache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, null, 100, conf); - CacheTestUtils.waitForCacheInitialization(cache, 10000); + assertTrue(cache.waitForCacheInitialization(10000)); assertEquals(BucketCache.ACCEPT_FACTOR_CONFIG_NAME + " failed to propagate.", 0.9f, cache.getAcceptableFactor(), 0); @@ -589,7 +579,7 @@ private void checkConfigValues(Configuration conf, Map configMa } BucketCache cache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, null, 100, conf); - CacheTestUtils.waitForCacheInitialization(cache, 10000); + assertTrue(cache.waitForCacheInitialization(10000)); assertTrue("Created BucketCache and expected it to succeed: " + expectSuccess[i] + ", but it actually was: " + !expectSuccess[i], expectSuccess[i]); } catch (IllegalArgumentException e) { @@ -816,7 +806,7 @@ public void testFreeBucketEntryRestoredFromFile() throws Exception { bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, persistencePath); - CacheTestUtils.waitForCacheInitialization(bucketCache, 10000); + assertTrue(bucketCache.waitForCacheInitialization(10000)); long usedByteSize = bucketCache.getAllocator().getUsedSize(); assertEquals(0, usedByteSize); @@ -840,7 +830,7 @@ public void testFreeBucketEntryRestoredFromFile() throws Exception { // restore cache from file bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, persistencePath); - CacheTestUtils.waitForCacheInitialization(bucketCache, 10000); + assertTrue(bucketCache.waitForCacheInitialization(10000)); assertEquals(usedByteSize, bucketCache.getAllocator().getUsedSize()); for (HFileBlockPair hfileBlockPair : hfileBlockPairs) { @@ -866,7 +856,7 @@ public void testBlockAdditionWaitWhenCache() throws Exception { bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, constructedBlockSizes, 1, 1, persistencePath); - CacheTestUtils.waitForCacheInitialization(bucketCache, 10000); + assertTrue(bucketCache.waitForCacheInitialization(10000)); long usedByteSize = bucketCache.getAllocator().getUsedSize(); assertEquals(0, usedByteSize); @@ -898,7 +888,7 @@ public void testBlockAdditionWaitWhenCache() throws Exception { // restore cache from file bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, persistencePath); - CacheTestUtils.waitForCacheInitialization(bucketCache, 10000); + assertTrue(bucketCache.waitForCacheInitialization(10000)); assertEquals(usedByteSize, bucketCache.getAllocator().getUsedSize()); for (HFileBlockPair hfileBlockPair : hfileBlockPairs) { @@ -955,7 +945,7 @@ private BucketCache testNotifyFileCachingCompleted(Path filePath, int totalBlock String ioEngineName = "file:" + dataTestDir + "/bucketNoRecycler.cache"; BucketCache bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, constructedBlockSizes, 1, 1, null); - CacheTestUtils.waitForCacheInitialization(bucketCache, 10000); + assertTrue(bucketCache.waitForCacheInitialization(10000)); long usedByteSize = bucketCache.getAllocator().getUsedSize(); assertEquals(0, usedByteSize); HFileBlockPair[] hfileBlockPairs = diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestRecoveryPersistentBucketCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestRecoveryPersistentBucketCache.java index e1fd70254583..4defd290ec30 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestRecoveryPersistentBucketCache.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestRecoveryPersistentBucketCache.java @@ -62,6 +62,8 @@ public void testBucketCacheRecovery() throws Exception { 8192, bucketSizes, writeThreads, writerQLen, testDir + "/bucket.persistence", DEFAULT_ERROR_TOLERATION_DURATION, conf); assertTrue(CacheTestUtils.waitForCacheInitialization(bucketCache, 1000)); + assertTrue(bucketCache.isCacheInitialized("testBucketCacheRecovery") + && bucketCache.isCacheEnabled()); CacheTestUtils.HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(8192, 4); @@ -147,16 +149,7 @@ public void testBucketCacheEvictByHFileAfterRecovery() throws Exception { private void waitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey) throws InterruptedException { - try { - long timeout = 120000; - while (!cache.backingMap.containsKey(cacheKey) || cache.ramCache.containsKey(cacheKey)) { - Thread.sleep(100); - if (timeout <= 0) { - break; - } - timeout -= 100; - } - } finally { + while (!cache.backingMap.containsKey(cacheKey) || cache.ramCache.containsKey(cacheKey)) { Thread.sleep(100); } } From 01c560810465866785a920048974421724ae7e02 Mon Sep 17 00:00:00 2001 From: Janardhan Hungund Date: Thu, 29 Aug 2024 19:59:07 +0530 Subject: [PATCH 4/9] HBASE-28804: Added unit tests for validation of cache in initialization state. Change-Id: Ib54f565152c391da727be7413dd775fb507daea7 --- .../hbase/io/hfile/bucket/BucketCache.java | 6 ++- .../TestRecoveryPersistentBucketCache.java | 51 +++++++++++++++++++ 2 files changed, 56 insertions(+), 1 deletion(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java index 861aad75f9d2..a4039c2df10e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java @@ -178,7 +178,7 @@ public class BucketCache implements BlockCache, HeapSize { /** * Enum to represent the state of cache */ - private enum CacheState { + protected enum CacheState { // Initializing: State when the cache is being initialised from persistence. INITIALIZING, // Enabled: State when cache is initialised and is ready. @@ -904,6 +904,10 @@ public void setCacheInconsistent(boolean setCacheInconsistent) { isCacheInconsistent.set(setCacheInconsistent); } + protected void setCacheState(CacheState state) { + cacheState = state; + } + /* * Statistics thread. Periodically output cache statistics to the log. */ diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestRecoveryPersistentBucketCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestRecoveryPersistentBucketCache.java index 4defd290ec30..0f4ed4b9e3d8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestRecoveryPersistentBucketCache.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestRecoveryPersistentBucketCache.java @@ -147,6 +147,57 @@ public void testBucketCacheEvictByHFileAfterRecovery() throws Exception { TEST_UTIL.cleanupTestDir(); } + @Test + public void testValidateCacheInitialization() throws Exception { + HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); + Path testDir = TEST_UTIL.getDataTestDir(); + TEST_UTIL.getTestFileSystem().mkdirs(testDir); + Configuration conf = HBaseConfiguration.create(); + // Disables the persister thread by setting its interval to MAX_VALUE + conf.setLong(BUCKETCACHE_PERSIST_INTERVAL_KEY, Long.MAX_VALUE); + int[] bucketSizes = new int[] { 8 * 1024 + 1024 }; + BucketCache bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, + 8192, bucketSizes, writeThreads, writerQLen, testDir + "/bucket.persistence", + DEFAULT_ERROR_TOLERATION_DURATION, conf); + assertTrue(CacheTestUtils.waitForCacheInitialization(bucketCache, 10000)); + + CacheTestUtils.HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(8192, 4); + + // Add four blocks + cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[0].getBlockName(), blocks[0].getBlock()); + cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[1].getBlockName(), blocks[1].getBlock()); + cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[2].getBlockName(), blocks[2].getBlock()); + cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[3].getBlockName(), blocks[3].getBlock()); + // saves the current state of the cache + bucketCache.persistToFile(); + + BucketCache newBucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, + 8192, bucketSizes, writeThreads, writerQLen, testDir + "/bucket.persistence", + DEFAULT_ERROR_TOLERATION_DURATION, conf); + assertTrue(CacheTestUtils.waitForCacheInitialization(newBucketCache, 10000)); + + // Set the state of bucket cache to INITIALIZING + newBucketCache.setCacheState(BucketCache.CacheState.INITIALIZING); + + // Validate that zero values are returned for the cache being initialized. + assertEquals(0, newBucketCache.acceptableSize()); + assertEquals(0, newBucketCache.getPartitionSize(1)); + assertEquals(0, newBucketCache.getFreeSize()); + assertEquals(0, newBucketCache.getCurrentSize()); + assertEquals(false, newBucketCache.blockFitsIntoTheCache(blocks[0].getBlock()).get()); + + newBucketCache.setCacheState(BucketCache.CacheState.ENABLED); + + // Validate that non-zero values are returned for enabled cache + assertTrue(newBucketCache.acceptableSize() > 0); + assertTrue(newBucketCache.getPartitionSize(1) > 0); + assertTrue(newBucketCache.getFreeSize() > 0); + assertTrue(newBucketCache.getCurrentSize() > 0); + assertTrue(newBucketCache.blockFitsIntoTheCache(blocks[0].getBlock()).get()); + + TEST_UTIL.cleanupTestDir(); + } + private void waitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey) throws InterruptedException { while (!cache.backingMap.containsKey(cacheKey) || cache.ramCache.containsKey(cacheKey)) { From dbb9403251318d0911fa659377c60e2bbd410be9 Mon Sep 17 00:00:00 2001 From: Janardhan Hungund Date: Thu, 29 Aug 2024 20:57:50 +0530 Subject: [PATCH 5/9] HBASE-28804: Address further review comments. Change-Id: I446475dc84d52403f67762b878af874b9cbe1937 --- .../hadoop/hbase/io/hfile/bucket/TestBucketCache.java | 6 +++--- .../io/hfile/bucket/TestRecoveryPersistentBucketCache.java | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java index a0507234f091..4b14f21c613d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java @@ -60,6 +60,7 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Threads; +import org.apache.hadoop.hbase.Waiter; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -226,9 +227,8 @@ public void testHeapSizeChanges() throws Exception { public static void waitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey) throws InterruptedException { - while (!cache.backingMap.containsKey(cacheKey) || cache.ramCache.containsKey(cacheKey)) { - Thread.sleep(100); - } + Waiter.waitFor(HBaseConfiguration.create(), 10000, + () -> (cache.backingMap.containsKey(cacheKey) && !cache.ramCache.containsKey(cacheKey))); } public static void waitUntilAllFlushedToBucket(BucketCache cache) throws InterruptedException { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestRecoveryPersistentBucketCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestRecoveryPersistentBucketCache.java index 0f4ed4b9e3d8..69c012fca12d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestRecoveryPersistentBucketCache.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestRecoveryPersistentBucketCache.java @@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.io.hfile.CacheTestUtils; import org.apache.hadoop.hbase.io.hfile.Cacheable; import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.Waiter; import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -200,9 +201,8 @@ public void testValidateCacheInitialization() throws Exception { private void waitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey) throws InterruptedException { - while (!cache.backingMap.containsKey(cacheKey) || cache.ramCache.containsKey(cacheKey)) { - Thread.sleep(100); - } + Waiter.waitFor(HBaseConfiguration.create(), 12000, + () -> (cache.backingMap.containsKey(cacheKey) && !cache.ramCache.containsKey(cacheKey))); } // BucketCache.cacheBlock is async, it first adds block to ramCache and writeQueue, then writer From cb18e0f486a2ebf96a743efbde4ae796c3aedcbf Mon Sep 17 00:00:00 2001 From: Janardhan Hungund Date: Fri, 30 Aug 2024 08:43:19 +0530 Subject: [PATCH 6/9] HBASE-28804: Fix spotless bugs. Change-Id: I49d8e39cafff6cbe103f300235bec4aacd42934d --- .../hadoop/hbase/io/hfile/bucket/TestBucketCache.java | 2 +- .../io/hfile/bucket/TestRecoveryPersistentBucketCache.java | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java index 4b14f21c613d..d4b333fee700 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java @@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseTestingUtil; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.io.ByteBuffAllocator; import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; import org.apache.hadoop.hbase.io.hfile.BlockType; @@ -60,7 +61,6 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Threads; -import org.apache.hadoop.hbase.Waiter; import org.junit.After; import org.junit.Assert; import org.junit.Before; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestRecoveryPersistentBucketCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestRecoveryPersistentBucketCache.java index 69c012fca12d..32b44e1f3e8e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestRecoveryPersistentBucketCache.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestRecoveryPersistentBucketCache.java @@ -28,11 +28,11 @@ import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseTestingUtil; +import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; import org.apache.hadoop.hbase.io.hfile.CacheTestUtils; import org.apache.hadoop.hbase.io.hfile.Cacheable; import org.apache.hadoop.hbase.testclassification.SmallTests; -import org.apache.hadoop.hbase.Waiter; import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -63,8 +63,8 @@ public void testBucketCacheRecovery() throws Exception { 8192, bucketSizes, writeThreads, writerQLen, testDir + "/bucket.persistence", DEFAULT_ERROR_TOLERATION_DURATION, conf); assertTrue(CacheTestUtils.waitForCacheInitialization(bucketCache, 1000)); - assertTrue(bucketCache.isCacheInitialized("testBucketCacheRecovery") - && bucketCache.isCacheEnabled()); + assertTrue( + bucketCache.isCacheInitialized("testBucketCacheRecovery") && bucketCache.isCacheEnabled()); CacheTestUtils.HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(8192, 4); From 2f2fce826ea20d68ef7f3a42d6fb20fc09b0b269 Mon Sep 17 00:00:00 2001 From: Janardhan Hungund Date: Tue, 3 Sep 2024 18:44:14 +0530 Subject: [PATCH 7/9] HBASE-28804: Address review comments. Change-Id: I8db2162c8237c05fd98d3c771f50e5a92c9b78b5 --- .../hadoop/hbase/io/hfile/BlockCache.java | 3 +-- .../hadoop/hbase/io/hfile/CacheTestUtils.java | 4 ---- .../TestBlockEvictionOnRegionMovement.java | 2 +- .../hadoop/hbase/io/hfile/TestPrefetch.java | 2 -- .../hfile/bucket/TestBucketWriterThread.java | 3 +-- .../hfile/bucket/TestPrefetchPersistence.java | 5 ++-- .../TestRecoveryPersistentBucketCache.java | 12 +++++----- .../bucket/TestVerifyBucketCacheFile.java | 24 +++++++++---------- 8 files changed, 23 insertions(+), 32 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java index 3454c82bdec5..775bcb99b628 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java @@ -272,9 +272,8 @@ default boolean isCacheEnabled() { /** * Wait for the bucket cache to be enabled while server restart - * @param cache cache object * @param timeout time to wait for the bucket cache to be enable - * @return boolean trye if the bucket cache is enabled, false otherwise + * @return boolean true if the bucket cache is enabled, false otherwise */ default boolean waitForCacheInitialization(long timeout) { return true; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java index fb6cd77c270a..848f33bb9c3a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java @@ -351,8 +351,4 @@ public static void getBlockAndAssertEquals(BlockCache cache, BlockCacheKey key, } } } - - public static boolean waitForCacheInitialization(BlockCache cache, long timeout) { - return cache.waitForCacheInitialization(timeout); - } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestBlockEvictionOnRegionMovement.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestBlockEvictionOnRegionMovement.java index 3002383700e4..82a58da00697 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestBlockEvictionOnRegionMovement.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestBlockEvictionOnRegionMovement.java @@ -133,7 +133,7 @@ public void testBlockEvictionOnGracefulStop() throws Exception { cluster.startRegionServer(); Thread.sleep(500); - CacheTestUtils.waitForCacheInitialization(regionServingRS.getBlockCache().get(), 10000); + regionServingRS.getBlockCache().get().waitForCacheInitialization(10000); long newUsedCacheSize = regionServingRS.getBlockCache().get().getBlockCaches()[1].getCurrentSize(); assertEquals(oldUsedCacheSize, newUsedCacheSize); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetch.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetch.java index 7a4bad39c7fb..801b20a47747 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetch.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetch.java @@ -114,8 +114,6 @@ public void setUp() throws IOException, InterruptedException { conf.setBoolean(CacheConfig.PREFETCH_BLOCKS_ON_OPEN_KEY, true); fs = HFileSystem.get(conf); blockCache = BlockCacheFactory.createBlockCache(conf); - // Add some sleep to enable the cache to be instantiated. - Thread.sleep(2000); cacheConf = new CacheConfig(conf, blockCache); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketWriterThread.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketWriterThread.java index 6b03f466ac36..facbe7c50d11 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketWriterThread.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketWriterThread.java @@ -30,7 +30,6 @@ import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; -import org.apache.hadoop.hbase.io.hfile.CacheTestUtils; import org.apache.hadoop.hbase.io.hfile.Cacheable; import org.apache.hadoop.hbase.io.hfile.HFileBlock; import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.RAMQueueEntry; @@ -85,7 +84,7 @@ public void setUp() throws Exception { final int writerThreadsCount = 1; this.bc = new MockBucketCache("offheap", capacity, 1, new int[] { 1 }, writerThreadsCount, capacity, null, 100/* Tolerate ioerrors for 100ms */); - CacheTestUtils.waitForCacheInitialization(this.bc, 10000); + this.bc.waitForCacheInitialization(10000); assertEquals(writerThreadsCount, bc.writerThreads.length); assertEquals(writerThreadsCount, bc.writerQueues.size()); // Get reference to our single WriterThread instance. diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestPrefetchPersistence.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestPrefetchPersistence.java index c1cd8d96e4ff..09872f894029 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestPrefetchPersistence.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestPrefetchPersistence.java @@ -34,7 +34,6 @@ import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.fs.HFileSystem; import org.apache.hadoop.hbase.io.hfile.CacheConfig; -import org.apache.hadoop.hbase.io.hfile.CacheTestUtils; import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.io.hfile.HFileContext; import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; @@ -110,7 +109,7 @@ public void testPrefetchPersistence() throws Exception { bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence", 60 * 1000, conf); - CacheTestUtils.waitForCacheInitialization(bucketCache, 10000); + bucketCache.waitForCacheInitialization(10000); cacheConf = new CacheConfig(conf, bucketCache); long usedSize = bucketCache.getAllocator().getUsedSize(); @@ -129,7 +128,7 @@ public void testPrefetchPersistence() throws Exception { bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence", 60 * 1000, conf); - CacheTestUtils.waitForCacheInitialization(bucketCache, 10000); + bucketCache.waitForCacheInitialization(10000); cacheConf = new CacheConfig(conf, bucketCache); assertTrue(usedSize != 0); assertTrue(bucketCache.fullyCachedFiles.containsKey(storeFile.getName())); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestRecoveryPersistentBucketCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestRecoveryPersistentBucketCache.java index 32b44e1f3e8e..3a4af295dc84 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestRecoveryPersistentBucketCache.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestRecoveryPersistentBucketCache.java @@ -62,7 +62,7 @@ public void testBucketCacheRecovery() throws Exception { BucketCache bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, 8192, bucketSizes, writeThreads, writerQLen, testDir + "/bucket.persistence", DEFAULT_ERROR_TOLERATION_DURATION, conf); - assertTrue(CacheTestUtils.waitForCacheInitialization(bucketCache, 1000)); + assertTrue(bucketCache.waitForCacheInitialization(1000)); assertTrue( bucketCache.isCacheInitialized("testBucketCacheRecovery") && bucketCache.isCacheEnabled()); @@ -100,7 +100,7 @@ public void testBucketCacheRecovery() throws Exception { BucketCache newBucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, 8192, bucketSizes, writeThreads, writerQLen, testDir + "/bucket.persistence", DEFAULT_ERROR_TOLERATION_DURATION, conf); - assertTrue(CacheTestUtils.waitForCacheInitialization(newBucketCache, 10000)); + assertTrue(newBucketCache.waitForCacheInitialization(1000)); assertEquals(3, newBucketCache.backingMap.size()); assertNull(newBucketCache.getBlock(blocks[3].getBlockName(), false, false, false)); @@ -126,7 +126,7 @@ public void testBucketCacheEvictByHFileAfterRecovery() throws Exception { BucketCache bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, 8192, bucketSizes, writeThreads, writerQLen, testDir + "/bucket.persistence", DEFAULT_ERROR_TOLERATION_DURATION, conf); - assertTrue(CacheTestUtils.waitForCacheInitialization(bucketCache, 10000)); + assertTrue(bucketCache.waitForCacheInitialization(10000)); CacheTestUtils.HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(8192, 4); @@ -141,7 +141,7 @@ public void testBucketCacheEvictByHFileAfterRecovery() throws Exception { BucketCache newBucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, 8192, bucketSizes, writeThreads, writerQLen, testDir + "/bucket.persistence", DEFAULT_ERROR_TOLERATION_DURATION, conf); - assertTrue(CacheTestUtils.waitForCacheInitialization(newBucketCache, 10000)); + assertTrue(newBucketCache.waitForCacheInitialization(10000)); assertEquals(4, newBucketCache.backingMap.size()); newBucketCache.evictBlocksByHfileName(blocks[0].getBlockName().getHfileName()); assertEquals(3, newBucketCache.backingMap.size()); @@ -160,7 +160,7 @@ public void testValidateCacheInitialization() throws Exception { BucketCache bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, 8192, bucketSizes, writeThreads, writerQLen, testDir + "/bucket.persistence", DEFAULT_ERROR_TOLERATION_DURATION, conf); - assertTrue(CacheTestUtils.waitForCacheInitialization(bucketCache, 10000)); + assertTrue(bucketCache.waitForCacheInitialization(10000)); CacheTestUtils.HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(8192, 4); @@ -175,7 +175,7 @@ public void testValidateCacheInitialization() throws Exception { BucketCache newBucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, 8192, bucketSizes, writeThreads, writerQLen, testDir + "/bucket.persistence", DEFAULT_ERROR_TOLERATION_DURATION, conf); - assertTrue(CacheTestUtils.waitForCacheInitialization(newBucketCache, 10000)); + assertTrue(newBucketCache.waitForCacheInitialization(10000)); // Set the state of bucket cache to INITIALIZING newBucketCache.setCacheState(BucketCache.CacheState.INITIALIZING); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestVerifyBucketCacheFile.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestVerifyBucketCacheFile.java index 05951f77dd86..50fe9ba113e6 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestVerifyBucketCacheFile.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestVerifyBucketCacheFile.java @@ -100,7 +100,7 @@ public void testRetrieveFromFile() throws Exception { BucketCache bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence"); - assertTrue(CacheTestUtils.waitForCacheInitialization(bucketCache, 10000)); + assertTrue(bucketCache.waitForCacheInitialization(10000)); long usedSize = bucketCache.getAllocator().getUsedSize(); assertEquals(0, usedSize); CacheTestUtils.HFileBlockPair[] blocks = @@ -117,7 +117,7 @@ public void testRetrieveFromFile() throws Exception { bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence"); - assertTrue(CacheTestUtils.waitForCacheInitialization(bucketCache, 10000)); + assertTrue(bucketCache.waitForCacheInitialization(10000)); assertEquals(usedSize, bucketCache.getAllocator().getUsedSize()); // persist cache to file bucketCache.shutdown(); @@ -130,7 +130,7 @@ public void testRetrieveFromFile() throws Exception { bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence"); - assertTrue(CacheTestUtils.waitForCacheInitialization(bucketCache, 10000)); + assertTrue(bucketCache.waitForCacheInitialization(10000)); assertEquals(0, bucketCache.getAllocator().getUsedSize()); assertEquals(0, bucketCache.backingMap.size()); // Add blocks @@ -150,7 +150,7 @@ public void testRetrieveFromFile() throws Exception { bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence"); - assertTrue(CacheTestUtils.waitForCacheInitialization(bucketCache, 10000)); + assertTrue(bucketCache.waitForCacheInitialization(10000)); assertEquals(0, bucketCache.getAllocator().getUsedSize()); assertEquals(0, bucketCache.backingMap.size()); @@ -168,7 +168,7 @@ public void testRetrieveFromFileAfterDelete() throws Exception { BucketCache bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, mapFileName, 60 * 1000, conf); - assertTrue(CacheTestUtils.waitForCacheInitialization(bucketCache, 10000)); + assertTrue(bucketCache.waitForCacheInitialization(10000)); long usedSize = bucketCache.getAllocator().getUsedSize(); assertEquals(0, usedSize); @@ -190,7 +190,7 @@ public void testRetrieveFromFileAfterDelete() throws Exception { bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, mapFileName, 60 * 1000, conf); - assertTrue(CacheTestUtils.waitForCacheInitialization(bucketCache, 10000)); + assertTrue(bucketCache.waitForCacheInitialization(10000)); assertEquals(0, bucketCache.getAllocator().getUsedSize()); assertEquals(0, bucketCache.backingMap.size()); } @@ -214,7 +214,7 @@ public void testModifiedBucketCacheFileData() throws Exception { BucketCache bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence", DEFAULT_ERROR_TOLERATION_DURATION, conf); - assertTrue(CacheTestUtils.waitForCacheInitialization(bucketCache, 10000)); + assertTrue(bucketCache.waitForCacheInitialization(10000)); long usedSize = bucketCache.getAllocator().getUsedSize(); assertEquals(0, usedSize); @@ -239,7 +239,7 @@ public void testModifiedBucketCacheFileData() throws Exception { bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence"); - assertTrue(CacheTestUtils.waitForCacheInitialization(bucketCache, 10000)); + assertTrue(bucketCache.waitForCacheInitialization(10000)); assertEquals(0, bucketCache.getAllocator().getUsedSize()); assertEquals(0, bucketCache.backingMap.size()); @@ -269,7 +269,7 @@ public void testModifiedBucketCacheFileTime() throws Exception { BucketCache bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence"); - assertTrue(CacheTestUtils.waitForCacheInitialization(bucketCache, 10000)); + assertTrue(bucketCache.waitForCacheInitialization(10000)); long usedSize = bucketCache.getAllocator().getUsedSize(); assertEquals(0, usedSize); @@ -296,7 +296,7 @@ public void testModifiedBucketCacheFileTime() throws Exception { bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence"); - assertTrue(CacheTestUtils.waitForCacheInitialization(bucketCache, 10000)); + assertTrue(bucketCache.waitForCacheInitialization(10000)); assertEquals(usedSize, bucketCache.getAllocator().getUsedSize()); assertEquals(blockCount, bucketCache.backingMap.size()); @@ -323,7 +323,7 @@ public void testBucketCacheRecovery() throws Exception { BucketCache bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, mapFileName, DEFAULT_ERROR_TOLERATION_DURATION, conf); - assertTrue(CacheTestUtils.waitForCacheInitialization(bucketCache, 10000)); + assertTrue(bucketCache.waitForCacheInitialization(10000)); CacheTestUtils.HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(constructedBlockSize, 4); @@ -348,7 +348,7 @@ public void testBucketCacheRecovery() throws Exception { BucketCache newBucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, mapFileName, DEFAULT_ERROR_TOLERATION_DURATION, conf); - assertTrue(CacheTestUtils.waitForCacheInitialization(newBucketCache, 10000)); + assertTrue(newBucketCache.waitForCacheInitialization(10000)); assertNull(newBucketCache.getBlock(blocks[0].getBlockName(), false, false, false)); assertEquals(blocks[1].getBlock(), From 06aa65cdd720c0f94bc762e4c1d58590537cd687 Mon Sep 17 00:00:00 2001 From: Janardhan Hungund Date: Wed, 4 Sep 2024 14:46:46 +0530 Subject: [PATCH 8/9] HBASE-28804: Fix unit test. Change-Id: I391e5e453e077b7e5733cd997130467b356a3e66 --- .../apache/hadoop/hbase/io/hfile/bucket/BucketCache.java | 7 ++++--- .../hbase/io/hfile/bucket/TestVerifyBucketCacheFile.java | 3 ++- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java index a4039c2df10e..0e040ce4ceb2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java @@ -1436,7 +1436,8 @@ void persistToFile() throws IOException { persistChunkedBackingMap(fos); - LOG.debug("PersistToFile: after persisting backing map size: {}, fullycachedFiles size: {}, file name: {}", + LOG.debug( + "PersistToFile: after persisting backing map size: {}, fullycachedFiles size: {}, file name: {}", backingMap.size(), fullyCachedFiles.size(), tempPersistencePath.getName()); } catch (IOException e) { LOG.error("Failed to persist bucket cache to file", e); @@ -1623,8 +1624,8 @@ private void parsePB(BucketCacheProtos.BucketCacheEntry proto) throws IOExceptio fullyCachedFiles.clear(); fullyCachedFiles.putAll(BucketProtoUtils.fromPB(proto.getCachedFilesMap())); - LOG.info("After retrieval Backing map size: {}, fullyCachedFiles size: {}", - backingMap.size(), fullyCachedFiles.size()); + LOG.info("After retrieval Backing map size: {}, fullyCachedFiles size: {}", backingMap.size(), + fullyCachedFiles.size()); verifyFileIntegrity(proto); updateRegionSizeMapWhileRetrievingFromFile(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestVerifyBucketCacheFile.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestVerifyBucketCacheFile.java index 50fe9ba113e6..21b71fa79b8d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestVerifyBucketCacheFile.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestVerifyBucketCacheFile.java @@ -381,6 +381,7 @@ private void testChunkedBackingMapRecovery(int chunkSize, int numBlocks) throws BucketCache bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, mapFileName, DEFAULT_ERROR_TOLERATION_DURATION, conf); + assertTrue(bucketCache.waitForCacheInitialization(10000)); CacheTestUtils.HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(constructedBlockSize, numBlocks); @@ -396,7 +397,7 @@ private void testChunkedBackingMapRecovery(int chunkSize, int numBlocks) throws BucketCache newBucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, mapFileName, DEFAULT_ERROR_TOLERATION_DURATION, conf); - + assertTrue(newBucketCache.waitForCacheInitialization(10000)); assertEquals(numBlocks, newBucketCache.backingMap.size()); for (int i = 0; i < numBlocks; i++) { From 2c8438d2aa6f5f56f785a45b065e4962ad7fa685 Mon Sep 17 00:00:00 2001 From: Janardhan Hungund Date: Thu, 5 Sep 2024 14:27:07 +0530 Subject: [PATCH 9/9] HBASE-28804: Fixed check-style warnings Change-Id: Ieb6b5841cb7b23d919fbebbe420c0083167d6d0b --- .../hadoop/hbase/io/hfile/bucket/BucketCache.java | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java index 0e040ce4ceb2..11ec958b08a5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java @@ -371,7 +371,6 @@ public BucketCache(String ioEngineName, long capacity, int blockSize, int[] buck } else { bucketAllocator = new BucketAllocator(capacity, bucketSizes); this.cacheState = CacheState.ENABLED; - ; startWriterThreads(); } @@ -927,7 +926,9 @@ public void run() { } public void logStats() { - if (!isCacheInitialized("BucketCache::logStats")) return; + if (!isCacheInitialized("BucketCache::logStats")) { + return; + } long totalSize = bucketAllocator.getTotalSize(); long usedSize = bucketAllocator.getUsedSize(); @@ -1437,7 +1438,8 @@ void persistToFile() throws IOException { persistChunkedBackingMap(fos); LOG.debug( - "PersistToFile: after persisting backing map size: {}, fullycachedFiles size: {}, file name: {}", + "PersistToFile: after persisting backing map size: {}, fullycachedFiles size: {}," + + " file name: {}", backingMap.size(), fullyCachedFiles.size(), tempPersistencePath.getName()); } catch (IOException e) { LOG.error("Failed to persist bucket cache to file", e); @@ -1704,7 +1706,9 @@ private void checkIOErrorIsTolerated() { * Used to shut down the cache -or- turn it off in the case of something broken. */ private void disableCache() { - if (!isCacheEnabled()) return; + if (!isCacheEnabled()) { + return; + } LOG.info("Disabling cache"); cacheState = CacheState.DISABLED; ioEngine.shutdown();