From 686b8757b46d9765fd422bc3178700540933890a Mon Sep 17 00:00:00 2001 From: Rajeshbabu Chintaguntla Date: Wed, 28 Sep 2022 23:36:30 +0530 Subject: [PATCH 1/4] HBASE-27365 Minimise block addition failures due to no space in bucket cache writers queue by introducing wait time --- .../hbase/io/hfile/MemcachedBlockCache.java | 5 ++ .../hadoop/hbase/io/hfile/BlockCache.java | 10 ++++ .../hbase/io/hfile/CombinedBlockCache.java | 9 +++- .../hbase/io/hfile/HFileReaderImpl.java | 4 +- .../hbase/io/hfile/HFileWriterImpl.java | 2 +- .../hbase/io/hfile/LruAdaptiveBlockCache.java | 5 ++ .../hadoop/hbase/io/hfile/LruBlockCache.java | 5 ++ .../hbase/io/hfile/TinyLfuBlockCache.java | 5 ++ .../hbase/io/hfile/bucket/BucketCache.java | 20 ++++++- .../io/hfile/bucket/TestBucketCache.java | 53 +++++++++++++++++++ .../regionserver/TestHeapMemoryManager.java | 5 ++ .../src/test/resources/hbase-site.xml | 5 ++ 12 files changed, 123 insertions(+), 5 deletions(-) diff --git a/hbase-external-blockcache/src/main/java/org/apache/hadoop/hbase/io/hfile/MemcachedBlockCache.java b/hbase-external-blockcache/src/main/java/org/apache/hadoop/hbase/io/hfile/MemcachedBlockCache.java index d5c6bff8cd01..bf7a58f5ef4b 100644 --- a/hbase-external-blockcache/src/main/java/org/apache/hadoop/hbase/io/hfile/MemcachedBlockCache.java +++ b/hbase-external-blockcache/src/main/java/org/apache/hadoop/hbase/io/hfile/MemcachedBlockCache.java @@ -111,6 +111,11 @@ public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory) cacheBlock(cacheKey, buf); } + @Override public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory, + boolean waitWhenCache) { + cacheBlock(cacheKey, buf, inMemory); + } + @SuppressWarnings("FutureReturnValueIgnored") @Override public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java index 7660a42ba5e2..4421af4c4ed4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java @@ -34,6 +34,16 @@ public interface BlockCache extends Iterable { */ void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory); + /** + * Add block to cache. + * @param cacheKey The block's cache key. + * @param buf The block contents wrapped in a ByteBuffer. + * @param inMemory Whether block should be treated as in-memory + * @param waitWhenCache Whether to wait for the cache to be flushed mainly when BucketCache + * is configured. + */ + void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory, boolean waitWhenCache); + /** * Add block to cache (defaults to not in-memory). * @param cacheKey The block's cache key. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java index 6cd40b0858f5..be807c04c121 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java @@ -53,11 +53,16 @@ public long heapSize() { @Override public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory) { + cacheBlock(cacheKey, buf, inMemory, false); + } + + @Override public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory, + boolean waitWhenCache) { boolean metaBlock = isMetaBlock(buf.getBlockType()); if (metaBlock) { - l1Cache.cacheBlock(cacheKey, buf, inMemory); + l1Cache.cacheBlock(cacheKey, buf, inMemory, waitWhenCache); } else { - l2Cache.cacheBlock(cacheKey, buf, inMemory); + l2Cache.cacheBlock(cacheKey, buf, inMemory, waitWhenCache); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java index d47db18bad9d..c84bc0d57a6a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java @@ -1341,8 +1341,10 @@ public HFileBlock readBlock(long dataBlockOffset, long onDiskBlockSize, final bo // Cache the block if necessary cacheConf.getBlockCache().ifPresent(cache -> { if (cacheBlock && cacheConf.shouldCacheBlockOnRead(category)) { + // Using the wait on cache during compaction and prefetching. cache.cacheBlock(cacheKey, cacheCompressed ? hfileBlock : unpacked, - cacheConf.isInMemory()); + cacheConf.isInMemory(), isCompaction || + (pread && expectedBlockType == null && expectedDataBlockEncoding == null)); } }); if (unpacked != hfileBlock) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java index 6b7cf3caaa39..b33d471ae499 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java @@ -550,7 +550,7 @@ private void doCacheOnWrite(long offset) { HFileBlock cacheFormatBlock = blockWriter.getBlockForCaching(cacheConf); try { cache.cacheBlock(new BlockCacheKey(name, offset, true, cacheFormatBlock.getBlockType()), - cacheFormatBlock); + cacheFormatBlock, cacheConf.isInMemory(), true); } finally { // refCnt will auto increase when block add to Cache, see RAMCache#putIfAbsent cacheFormatBlock.release(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruAdaptiveBlockCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruAdaptiveBlockCache.java index 87932074bff1..82fc73e5e9ed 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruAdaptiveBlockCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruAdaptiveBlockCache.java @@ -552,6 +552,11 @@ public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory) } } + @Override public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory, + boolean waitWhenCache) { + cacheBlock(cacheKey, buf, inMemory); + } + /** * Sanity-checking for parity between actual block cache content and metrics. Intended only for * use with TRACE level logging and -ea JVM. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java index a3f883745e06..8ba73e5a299f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java @@ -477,6 +477,11 @@ public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) { cacheBlock(cacheKey, buf, false); } + @Override public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory, + boolean waitWhenCache) { + cacheBlock(cacheKey, buf, inMemory); + } + /** * Helper function that updates the local size counter and also updates any per-cf or * per-blocktype metrics it can discern from given {@link LruCachedBlock} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/TinyLfuBlockCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/TinyLfuBlockCache.java index 7852f19bd63e..e55879b6c259 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/TinyLfuBlockCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/TinyLfuBlockCache.java @@ -182,6 +182,11 @@ public void cacheBlock(BlockCacheKey cacheKey, Cacheable value, boolean inMemory cacheBlock(cacheKey, value); } + @Override public void cacheBlock(BlockCacheKey cacheKey, Cacheable value, boolean inMemory, + boolean waitWhenCache) { + cacheBlock(cacheKey, value); + } + @Override public void cacheBlock(BlockCacheKey key, Cacheable value) { if (value.heapSize() > maxBlockSize) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java index 2e2ae4865653..54f8d954a2e9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java @@ -248,6 +248,10 @@ public class BucketCache implements BlockCache, HeapSize { "hbase.bucketcache.persistent.file.integrity.check.algorithm"; private static final String DEFAULT_FILE_VERIFY_ALGORITHM = "MD5"; + private static final String QUEUE_ADDITION_WAIT_TIME = + "hbase.bucketcache.queue.addition.waittime"; + private static final long DEFAULT_QUEUE_ADDITION_WAIT_TIME = 0; + private long queueAdditionWaitTime; /** * Use {@link java.security.MessageDigest} class's encryption algorithms to check persistent file * integrity, default algorithm is MD5 @@ -288,6 +292,8 @@ public BucketCache(String ioEngineName, long capacity, int blockSize, int[] buck this.singleFactor = conf.getFloat(SINGLE_FACTOR_CONFIG_NAME, DEFAULT_SINGLE_FACTOR); this.multiFactor = conf.getFloat(MULTI_FACTOR_CONFIG_NAME, DEFAULT_MULTI_FACTOR); this.memoryFactor = conf.getFloat(MEMORY_FACTOR_CONFIG_NAME, DEFAULT_MEMORY_FACTOR); + this.queueAdditionWaitTime = conf.getLong(QUEUE_ADDITION_WAIT_TIME, + DEFAULT_QUEUE_ADDITION_WAIT_TIME); this.prefetchedFileListPath = conf.get(PREFETCH_PERSISTENCE_PATH_KEY); sanityCheckConfigs(); @@ -435,6 +441,18 @@ public void cacheBlock(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inM cacheBlockWithWait(cacheKey, cachedItem, inMemory, wait_when_cache); } + /** + * Cache the block with the specified name and buffer. + * @param cacheKey block's cache key + * @param cachedItem block buffer + * @param inMemory if block is in-memory + */ + @Override + public void cacheBlock(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory, + boolean waitWhenCache) { + cacheBlockWithWait(cacheKey, cachedItem, inMemory, waitWhenCache && queueAdditionWaitTime > 0); + } + /** * Cache the block to ramCache * @param cacheKey block's cache key @@ -491,7 +509,7 @@ protected void cacheBlockWithWaitInternal(BlockCacheKey cacheKey, Cacheable cach boolean successfulAddition = false; if (wait) { try { - successfulAddition = bq.offer(re, DEFAULT_CACHE_WAIT_TIME, TimeUnit.MILLISECONDS); + successfulAddition = bq.offer(re, queueAdditionWaitTime, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java index 908d6ce73267..018a9ffdea79 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java @@ -58,6 +58,7 @@ import org.apache.hadoop.hbase.testclassification.IOTests; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.util.Threads; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -811,4 +812,56 @@ public void testFreeBucketEntryRestoredFromFile() throws Exception { } } + /** + * This test is for HBASE-26295, {@link BucketEntry} which is restored from a persistence file + * could not be freed even if corresponding {@link HFileBlock} is evicted from + * {@link BucketCache}. + */ + @Test + public void testBlockAdditionWaitWhenCache() throws Exception { + try { + final Path dataTestDir = createAndGetTestDir(); + + String ioEngineName = "file:" + dataTestDir + "/bucketNoRecycler.cache"; + String persistencePath = dataTestDir + "/bucketNoRecycler.persistence"; + + BucketCache bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, + constructedBlockSizes, 1, 1, persistencePath); + long usedByteSize = bucketCache.getAllocator().getUsedSize(); + assertEquals(0, usedByteSize); + + HFileBlockPair[] hfileBlockPairs = + CacheTestUtils.generateHFileBlocks(constructedBlockSize, 10); + // Add blocks + for (HFileBlockPair hfileBlockPair : hfileBlockPairs) { + bucketCache.cacheBlock(hfileBlockPair.getBlockName(), hfileBlockPair.getBlock(), + false, true); + } + + // Wait for blocks size to match the number of blocks. + while(bucketCache.backingMap.size() != 10) { + Threads.sleep(100); + } + usedByteSize = bucketCache.getAllocator().getUsedSize(); + assertNotEquals(0, usedByteSize); + // persist cache to file + bucketCache.shutdown(); + assertTrue(new File(persistencePath).exists()); + + // restore cache from file + bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, + constructedBlockSizes, writeThreads, writerQLen, persistencePath); + assertFalse(new File(persistencePath).exists()); + assertEquals(usedByteSize, bucketCache.getAllocator().getUsedSize()); + + for (HFileBlockPair hfileBlockPair : hfileBlockPairs) { + BlockCacheKey blockCacheKey = hfileBlockPair.getBlockName(); + bucketCache.evictBlock(blockCacheKey); + } + assertEquals(0, bucketCache.getAllocator().getUsedSize()); + assertEquals(0, bucketCache.backingMap.size()); + } finally { + HBASE_TESTING_UTILITY.cleanupTestDir(); + } + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHeapMemoryManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHeapMemoryManager.java index 443019bee808..46a0635b8ca2 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHeapMemoryManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHeapMemoryManager.java @@ -651,6 +651,11 @@ public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) { } + @Override public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory, + boolean waitWhenCache) { + + } + @Override public Cacheable getBlock(BlockCacheKey cacheKey, boolean caching, boolean repeat, boolean updateCacheMetrics) { diff --git a/hbase-server/src/test/resources/hbase-site.xml b/hbase-server/src/test/resources/hbase-site.xml index 0b6f1d59a0e9..5544ae787b33 100644 --- a/hbase-server/src/test/resources/hbase-site.xml +++ b/hbase-server/src/test/resources/hbase-site.xml @@ -277,4 +277,9 @@ 3 Default is unbounded + + hbase.bucketcache.queue.addition.waittime + 100 + Default is 0 + From 4b659fa5a680e95acc6da82cae59b2b7dd52baaf Mon Sep 17 00:00:00 2001 From: Rajeshbabu Chintaguntla Date: Fri, 30 Sep 2022 21:27:38 +0530 Subject: [PATCH 2/4] Handling review comments --- .../hbase/io/hfile/MemcachedBlockCache.java | 5 ---- .../hadoop/hbase/io/hfile/BlockCache.java | 5 +++- .../hbase/io/hfile/CombinedBlockCache.java | 2 +- .../hbase/io/hfile/HFileReaderImpl.java | 6 ++--- .../hbase/io/hfile/LruAdaptiveBlockCache.java | 5 ---- .../hadoop/hbase/io/hfile/LruBlockCache.java | 5 ---- .../hbase/io/hfile/TinyLfuBlockCache.java | 5 ---- .../hbase/io/hfile/bucket/BucketCache.java | 9 +------ .../io/hfile/bucket/TestBucketCache.java | 24 ++++++++++++------- .../hfile/bucket/TestBucketCacheRefCnt.java | 2 -- .../hfile/bucket/TestPrefetchPersistence.java | 2 -- .../regionserver/TestHeapMemoryManager.java | 5 ---- 12 files changed, 24 insertions(+), 51 deletions(-) diff --git a/hbase-external-blockcache/src/main/java/org/apache/hadoop/hbase/io/hfile/MemcachedBlockCache.java b/hbase-external-blockcache/src/main/java/org/apache/hadoop/hbase/io/hfile/MemcachedBlockCache.java index bf7a58f5ef4b..d5c6bff8cd01 100644 --- a/hbase-external-blockcache/src/main/java/org/apache/hadoop/hbase/io/hfile/MemcachedBlockCache.java +++ b/hbase-external-blockcache/src/main/java/org/apache/hadoop/hbase/io/hfile/MemcachedBlockCache.java @@ -111,11 +111,6 @@ public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory) cacheBlock(cacheKey, buf); } - @Override public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory, - boolean waitWhenCache) { - cacheBlock(cacheKey, buf, inMemory); - } - @SuppressWarnings("FutureReturnValueIgnored") @Override public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java index 4421af4c4ed4..4dde8ebc7811 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java @@ -42,7 +42,10 @@ public interface BlockCache extends Iterable { * @param waitWhenCache Whether to wait for the cache to be flushed mainly when BucketCache * is configured. */ - void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory, boolean waitWhenCache); + default void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory, + boolean waitWhenCache) { + cacheBlock(cacheKey, buf, inMemory); + } /** * Add block to cache (defaults to not in-memory). diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java index be807c04c121..48bd79aabc6e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java @@ -60,7 +60,7 @@ public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory) boolean waitWhenCache) { boolean metaBlock = isMetaBlock(buf.getBlockType()); if (metaBlock) { - l1Cache.cacheBlock(cacheKey, buf, inMemory, waitWhenCache); + l1Cache.cacheBlock(cacheKey, buf, inMemory); } else { l2Cache.cacheBlock(cacheKey, buf, inMemory, waitWhenCache); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java index c84bc0d57a6a..65b0b0e383cf 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java @@ -1328,7 +1328,8 @@ public HFileBlock readBlock(long dataBlockOffset, long onDiskBlockSize, final bo // Cache the block if necessary cacheConf.getBlockCache().ifPresent(cache -> { if (cacheBlock && cacheConf.shouldCacheBlockOnRead(category)) { - cache.cacheBlock(cacheKey, hfileBlock, cacheConf.isInMemory()); + cache.cacheBlock(cacheKey, hfileBlock, cacheConf.isInMemory(), + isCompaction || cacheOnly); } }); @@ -1343,8 +1344,7 @@ public HFileBlock readBlock(long dataBlockOffset, long onDiskBlockSize, final bo if (cacheBlock && cacheConf.shouldCacheBlockOnRead(category)) { // Using the wait on cache during compaction and prefetching. cache.cacheBlock(cacheKey, cacheCompressed ? hfileBlock : unpacked, - cacheConf.isInMemory(), isCompaction || - (pread && expectedBlockType == null && expectedDataBlockEncoding == null)); + cacheConf.isInMemory(), isCompaction || cacheOnly); } }); if (unpacked != hfileBlock) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruAdaptiveBlockCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruAdaptiveBlockCache.java index 82fc73e5e9ed..87932074bff1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruAdaptiveBlockCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruAdaptiveBlockCache.java @@ -552,11 +552,6 @@ public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory) } } - @Override public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory, - boolean waitWhenCache) { - cacheBlock(cacheKey, buf, inMemory); - } - /** * Sanity-checking for parity between actual block cache content and metrics. Intended only for * use with TRACE level logging and -ea JVM. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java index 8ba73e5a299f..a3f883745e06 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java @@ -477,11 +477,6 @@ public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) { cacheBlock(cacheKey, buf, false); } - @Override public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory, - boolean waitWhenCache) { - cacheBlock(cacheKey, buf, inMemory); - } - /** * Helper function that updates the local size counter and also updates any per-cf or * per-blocktype metrics it can discern from given {@link LruCachedBlock} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/TinyLfuBlockCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/TinyLfuBlockCache.java index e55879b6c259..7852f19bd63e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/TinyLfuBlockCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/TinyLfuBlockCache.java @@ -182,11 +182,6 @@ public void cacheBlock(BlockCacheKey cacheKey, Cacheable value, boolean inMemory cacheBlock(cacheKey, value); } - @Override public void cacheBlock(BlockCacheKey cacheKey, Cacheable value, boolean inMemory, - boolean waitWhenCache) { - cacheBlock(cacheKey, value); - } - @Override public void cacheBlock(BlockCacheKey key, Cacheable value) { if (value.heapSize() > maxBlockSize) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java index 54f8d954a2e9..a82d75b53f84 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java @@ -175,13 +175,6 @@ public class BucketCache implements BlockCache, HeapSize { private static final int DEFAULT_CACHE_WAIT_TIME = 50; - /** - * Used in tests. If this flag is false and the cache speed is very fast, bucket cache will skip - * some blocks when caching. If the flag is true, we will wait until blocks are flushed to - * IOEngine. - */ - boolean wait_when_cache = false; - private final BucketCacheStats cacheStats = new BucketCacheStats(); private final String persistencePath; @@ -438,7 +431,7 @@ public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) { */ @Override public void cacheBlock(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory) { - cacheBlockWithWait(cacheKey, cachedItem, inMemory, wait_when_cache); + cacheBlockWithWait(cacheKey, cachedItem, inMemory, false); } /** diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java index 018a9ffdea79..6234f96b059c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java @@ -120,7 +120,6 @@ public MockedBucketCache(String ioEngineName, long capacity, int blockSize, int[ int writerThreads, int writerQLen, String persistencePath) throws IOException { super(ioEngineName, capacity, blockSize, bucketSizes, writerThreads, writerQLen, persistencePath); - super.wait_when_cache = true; } @Override @@ -242,8 +241,8 @@ public static void waitUntilAllFlushedToBucket(BucketCache cache) throws Interru // BucketCache.cacheBlock is async, it first adds block to ramCache and writeQueue, then writer // threads will flush it to the bucket and put reference entry in backingMap. private void cacheAndWaitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey, - Cacheable block) throws InterruptedException { - cache.cacheBlock(cacheKey, block); + Cacheable block, boolean waitWhenCache) throws InterruptedException { + cache.cacheBlock(cacheKey, block, false, waitWhenCache); waitUntilFlushedToBucket(cache, cacheKey); } @@ -251,7 +250,7 @@ private void cacheAndWaitUntilFlushedToBucket(BucketCache cache, BlockCacheKey c public void testMemoryLeak() throws Exception { final BlockCacheKey cacheKey = new BlockCacheKey("dummy", 1L); cacheAndWaitUntilFlushedToBucket(cache, cacheKey, - new CacheTestUtils.ByteArrayCacheable(new byte[10])); + new CacheTestUtils.ByteArrayCacheable(new byte[10]), true); long lockId = cache.backingMap.get(cacheKey).offset(); ReentrantReadWriteLock lock = cache.offsetLock.getLock(lockId); lock.writeLock().lock(); @@ -266,7 +265,7 @@ public void run() { cache.blockEvicted(cacheKey, cache.backingMap.remove(cacheKey), true, true); assertEquals(0, cache.getBlockCount()); cacheAndWaitUntilFlushedToBucket(cache, cacheKey, - new CacheTestUtils.ByteArrayCacheable(new byte[10])); + new CacheTestUtils.ByteArrayCacheable(new byte[10]), true); assertEquals(1, cache.getBlockCount()); lock.writeLock().unlock(); evictThread.join(); @@ -342,7 +341,7 @@ private void testRetrievalUtils(Path testDir, String ioEngineName) bucketCache.cacheBlock(block.getBlockName(), block.getBlock()); } for (HFileBlockPair block : blocks) { - cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock()); + cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock(), false); } usedSize = bucketCache.getAllocator().getUsedSize(); assertNotEquals(0, usedSize); @@ -404,7 +403,7 @@ public void testRetrieveFromFileWithoutPersistence() throws Exception { bucketCache.cacheBlock(block.getBlockName(), block.getBlock()); } for (HFileBlockPair block : blocks) { - cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock()); + cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock(), false); } usedSize = bucketCache.getAllocator().getUsedSize(); assertNotEquals(0, usedSize); @@ -787,7 +786,7 @@ public void testFreeBucketEntryRestoredFromFile() throws Exception { for (HFileBlockPair hfileBlockPair : hfileBlockPairs) { cacheAndWaitUntilFlushedToBucket(bucketCache, hfileBlockPair.getBlockName(), - hfileBlockPair.getBlock()); + hfileBlockPair.getBlock(), false); } usedByteSize = bucketCache.getAllocator().getUsedSize(); assertNotEquals(0, usedByteSize); @@ -838,9 +837,16 @@ public void testBlockAdditionWaitWhenCache() throws Exception { false, true); } + // Max wait for 10 seconds. + long timeout = 10000; // Wait for blocks size to match the number of blocks. - while(bucketCache.backingMap.size() != 10) { + while(bucketCache.backingMap.size() != 10 ) { + if (timeout <= 0) break; Threads.sleep(100); + timeout=-100; + } + for (HFileBlockPair hfileBlockPair : hfileBlockPairs) { + assertTrue(bucketCache.backingMap.containsKey(hfileBlockPair.getBlockName())); } usedByteSize = bucketCache.getAllocator().getUsedSize(); assertNotEquals(0, usedByteSize); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCacheRefCnt.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCacheRefCnt.java index 44a398bda44b..a3f291b7949c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCacheRefCnt.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCacheRefCnt.java @@ -110,8 +110,6 @@ private void disableWriter() { // Flakey TestBucketCacheRefCnt.testBlockInRAMCache:121 expected:<3> but was:<2> public void testBlockInRAMCache() throws IOException { cache = create(1, 1000); - // Set this to true; - cache.wait_when_cache = true; disableWriter(); final String prefix = "testBlockInRamCache"; try { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestPrefetchPersistence.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestPrefetchPersistence.java index 6a5c9dcf2db3..771ab0158f61 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestPrefetchPersistence.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestPrefetchPersistence.java @@ -117,7 +117,6 @@ public void testPrefetchPersistence() throws Exception { bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence", 60 * 1000, conf); - bucketCache.wait_when_cache = true; cacheConf = new CacheConfig(conf, bucketCache); long usedSize = bucketCache.getAllocator().getUsedSize(); @@ -137,7 +136,6 @@ public void testPrefetchPersistence() throws Exception { bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence", 60 * 1000, conf); - bucketCache.wait_when_cache = true; assertFalse(new File(testDir + "/bucket.persistence").exists()); assertFalse(new File(testDir + "/prefetch.persistence").exists()); assertTrue(usedSize != 0); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHeapMemoryManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHeapMemoryManager.java index 46a0635b8ca2..443019bee808 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHeapMemoryManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHeapMemoryManager.java @@ -651,11 +651,6 @@ public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) { } - @Override public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory, - boolean waitWhenCache) { - - } - @Override public Cacheable getBlock(BlockCacheKey cacheKey, boolean caching, boolean repeat, boolean updateCacheMetrics) { From 9a395cffccbdfd8d5701d672b24255e30ddbed60 Mon Sep 17 00:00:00 2001 From: Rajeshbabu Chintaguntla Date: Fri, 30 Sep 2022 23:09:19 +0530 Subject: [PATCH 3/4] Removed caching compacting blocks --- .../org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java index 65b0b0e383cf..1caca6abf4e5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java @@ -1328,8 +1328,7 @@ public HFileBlock readBlock(long dataBlockOffset, long onDiskBlockSize, final bo // Cache the block if necessary cacheConf.getBlockCache().ifPresent(cache -> { if (cacheBlock && cacheConf.shouldCacheBlockOnRead(category)) { - cache.cacheBlock(cacheKey, hfileBlock, cacheConf.isInMemory(), - isCompaction || cacheOnly); + cache.cacheBlock(cacheKey, hfileBlock, cacheConf.isInMemory(), cacheOnly); } }); @@ -1343,8 +1342,7 @@ public HFileBlock readBlock(long dataBlockOffset, long onDiskBlockSize, final bo cacheConf.getBlockCache().ifPresent(cache -> { if (cacheBlock && cacheConf.shouldCacheBlockOnRead(category)) { // Using the wait on cache during compaction and prefetching. - cache.cacheBlock(cacheKey, cacheCompressed ? hfileBlock : unpacked, - cacheConf.isInMemory(), isCompaction || cacheOnly); + cache.cacheBlock(cacheKey, cacheCompressed ? hfileBlock : unpacked, cacheOnly); } }); if (unpacked != hfileBlock) { From ccd2caa3e1f8d5b83247d8a896e88b3056716bd5 Mon Sep 17 00:00:00 2001 From: Rajeshbabu Chintaguntla Date: Sat, 1 Oct 2022 09:29:42 +0530 Subject: [PATCH 4/4] Handling checkstyles and spotbugs --- .../apache/hadoop/hbase/io/hfile/BlockCache.java | 10 +++++----- .../hadoop/hbase/io/hfile/CombinedBlockCache.java | 3 ++- .../hadoop/hbase/io/hfile/bucket/BucketCache.java | 6 +++--- .../hbase/io/hfile/bucket/TestBucketCache.java | 14 ++++++++------ 4 files changed, 18 insertions(+), 15 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java index 4dde8ebc7811..8419ccb6c1cd 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java @@ -36,11 +36,11 @@ public interface BlockCache extends Iterable { /** * Add block to cache. - * @param cacheKey The block's cache key. - * @param buf The block contents wrapped in a ByteBuffer. - * @param inMemory Whether block should be treated as in-memory - * @param waitWhenCache Whether to wait for the cache to be flushed mainly when BucketCache - * is configured. + * @param cacheKey The block's cache key. + * @param buf The block contents wrapped in a ByteBuffer. + * @param inMemory Whether block should be treated as in-memory + * @param waitWhenCache Whether to wait for the cache to be flushed mainly when BucketCache is + * configured. */ default void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory, boolean waitWhenCache) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java index 48bd79aabc6e..d616d6f40d9f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java @@ -56,7 +56,8 @@ public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory) cacheBlock(cacheKey, buf, inMemory, false); } - @Override public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory, + @Override + public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory, boolean waitWhenCache) { boolean metaBlock = isMetaBlock(buf.getBlockType()); if (metaBlock) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java index a82d75b53f84..18295f285c49 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java @@ -285,8 +285,8 @@ public BucketCache(String ioEngineName, long capacity, int blockSize, int[] buck this.singleFactor = conf.getFloat(SINGLE_FACTOR_CONFIG_NAME, DEFAULT_SINGLE_FACTOR); this.multiFactor = conf.getFloat(MULTI_FACTOR_CONFIG_NAME, DEFAULT_MULTI_FACTOR); this.memoryFactor = conf.getFloat(MEMORY_FACTOR_CONFIG_NAME, DEFAULT_MEMORY_FACTOR); - this.queueAdditionWaitTime = conf.getLong(QUEUE_ADDITION_WAIT_TIME, - DEFAULT_QUEUE_ADDITION_WAIT_TIME); + this.queueAdditionWaitTime = + conf.getLong(QUEUE_ADDITION_WAIT_TIME, DEFAULT_QUEUE_ADDITION_WAIT_TIME); this.prefetchedFileListPath = conf.get(PREFETCH_PERSISTENCE_PATH_KEY); sanityCheckConfigs(); @@ -502,7 +502,7 @@ protected void cacheBlockWithWaitInternal(BlockCacheKey cacheKey, Cacheable cach boolean successfulAddition = false; if (wait) { try { - successfulAddition = bq.offer(re, queueAdditionWaitTime, TimeUnit.MILLISECONDS); + successfulAddition = bq.offer(re, queueAdditionWaitTime, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java index 6234f96b059c..a132673e8464 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java @@ -341,7 +341,8 @@ private void testRetrievalUtils(Path testDir, String ioEngineName) bucketCache.cacheBlock(block.getBlockName(), block.getBlock()); } for (HFileBlockPair block : blocks) { - cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock(), false); + cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock(), + false); } usedSize = bucketCache.getAllocator().getUsedSize(); assertNotEquals(0, usedSize); @@ -403,7 +404,8 @@ public void testRetrieveFromFileWithoutPersistence() throws Exception { bucketCache.cacheBlock(block.getBlockName(), block.getBlock()); } for (HFileBlockPair block : blocks) { - cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock(), false); + cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock(), + false); } usedSize = bucketCache.getAllocator().getUsedSize(); assertNotEquals(0, usedSize); @@ -833,17 +835,17 @@ public void testBlockAdditionWaitWhenCache() throws Exception { CacheTestUtils.generateHFileBlocks(constructedBlockSize, 10); // Add blocks for (HFileBlockPair hfileBlockPair : hfileBlockPairs) { - bucketCache.cacheBlock(hfileBlockPair.getBlockName(), hfileBlockPair.getBlock(), - false, true); + bucketCache.cacheBlock(hfileBlockPair.getBlockName(), hfileBlockPair.getBlock(), false, + true); } // Max wait for 10 seconds. long timeout = 10000; // Wait for blocks size to match the number of blocks. - while(bucketCache.backingMap.size() != 10 ) { + while (bucketCache.backingMap.size() != 10) { if (timeout <= 0) break; Threads.sleep(100); - timeout=-100; + timeout = -100; } for (HFileBlockPair hfileBlockPair : hfileBlockPairs) { assertTrue(bucketCache.backingMap.containsKey(hfileBlockPair.getBlockName()));