From 675c889796e149910f5270039b143e39229a7b7f Mon Sep 17 00:00:00 2001 From: Wellington Chevreuil Date: Tue, 25 May 2021 16:02:44 +0100 Subject: [PATCH 01/12] HBASE-25392 Direct insert compacted HFiles into data directory. --- ...xt.java => DirectInStoreFlushContext.java} | 6 +- ...Flusher.java => DirectInStoreFlusher.java} | 4 +- .../hadoop/hbase/regionserver/HRegion.java | 2 +- .../hadoop/hbase/regionserver/HStore.java | 29 ++-- .../regionserver/compactions/Compactor.java | 58 ++++++++ .../compactions/DefaultCompactor.java | 2 +- .../compactions/DirectInStoreCompactor.java | 78 +++++++++++ ...ava => TestDirectInStoreFlushContext.java} | 9 +- ...her.java => TestDirectInStoreFlusher.java} | 8 +- .../hadoop/hbase/regionserver/TestHStore.java | 47 +++++++ .../compactions/TestDefaultCompactor.java | 121 ++++++++++++++++ .../TestDirectInStoreCompactor.java | 132 ++++++++++++++++++ 12 files changed, 461 insertions(+), 35 deletions(-) rename hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/{PersistedStoreFlushContext.java => DirectInStoreFlushContext.java} (90%) rename hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/{PersistedEngineStoreFlusher.java => DirectInStoreFlusher.java} (95%) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DirectInStoreCompactor.java rename hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/{TestPersistedStoreFlushContext.java => TestDirectInStoreFlushContext.java} (90%) rename hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/{TestPersistedEngineStoreFlusher.java => TestDirectInStoreFlusher.java} (95%) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestDefaultCompactor.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestDirectInStoreCompactor.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/PersistedStoreFlushContext.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DirectInStoreFlushContext.java similarity index 90% rename from hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/PersistedStoreFlushContext.java rename to hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DirectInStoreFlushContext.java index 0031b0e498ea..874996bba84d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/PersistedStoreFlushContext.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DirectInStoreFlushContext.java @@ -33,11 +33,11 @@ * To be used only when PersistedStoreEngine is configured as the StoreEngine implementation. */ @InterfaceAudience.Private -public class PersistedStoreFlushContext extends DefaultStoreFlushContext { +public class DirectInStoreFlushContext extends DefaultStoreFlushContext { - private static final Logger LOG = LoggerFactory.getLogger(PersistedStoreFlushContext.class); + private static final Logger LOG = LoggerFactory.getLogger(DirectInStoreFlushContext.class); - public PersistedStoreFlushContext(HStore store, Long cacheFlushSeqNum, + public DirectInStoreFlushContext(HStore store, Long cacheFlushSeqNum, FlushLifeCycleTracker tracker) { super.init(store, cacheFlushSeqNum, tracker); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/PersistedEngineStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DirectInStoreFlusher.java similarity index 95% rename from hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/PersistedEngineStoreFlusher.java rename to hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DirectInStoreFlusher.java index 9094e58b938c..ba52c7518fbb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/PersistedEngineStoreFlusher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DirectInStoreFlusher.java @@ -36,9 +36,9 @@ * instead of a temp dir.. */ @InterfaceAudience.Private -public class PersistedEngineStoreFlusher extends DefaultStoreFlusher { +public class DirectInStoreFlusher extends DefaultStoreFlusher { - public PersistedEngineStoreFlusher(Configuration conf, HStore store) { + public DirectInStoreFlusher(Configuration conf, HStore store) { super(conf, store); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 5a429aa4c693..7c99069d632a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -1372,7 +1372,7 @@ public RegionInfo getRegionInfo() { * @return Instance of {@link RegionServerServices} used by this HRegion. * Can be null. */ - RegionServerServices getRegionServerServices() { + public RegionServerServices getRegionServerServices() { return this.rsServices; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index ce0c641fbb34..e55f03cc200b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -228,6 +228,12 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, private long blockingFileCount; private int compactionCheckMultiplier; + public Encryption.Context getCryptoContext() { + return cryptoContext; + } + + protected Encryption.Context cryptoContext = Encryption.Context.NONE; + AtomicLong flushedCellsCount = new AtomicLong(); private AtomicLong compactedCellsCount = new AtomicLong(); private AtomicLong majorCompactedCellsCount = new AtomicLong(); @@ -694,7 +700,7 @@ private void refreshStoreFilesInternal(Collection newFiles) throw refreshStoreSizeAndTotalBytes(); } - protected HStoreFile createStoreFileAndReader(final Path p) throws IOException { + public HStoreFile createStoreFileAndReader(final Path p) throws IOException { StoreFileInfo info = new StoreFileInfo(conf, this.getFileSystem(), p, isPrimaryReplicaStore()); return createStoreFileAndReader(info); @@ -1192,7 +1198,7 @@ public StoreFileWriter createWriterInTmp(long maxKeyCount, Compression.Algorithm return builder.build(); } - HFileContext createFileContext(Compression.Algorithm compression, + public HFileContext createFileContext(Compression.Algorithm compression, boolean includeMVCCReadpoint, boolean includesTag, Encryption.Context encryptionContext) { if (compression == null) { compression = HFile.DEFAULT_COMPRESSION_ALGORITHM; @@ -1508,7 +1514,7 @@ protected List doCompaction(CompactionRequestImpl cr, List newFiles) throws IOException { // Do the steps necessary to complete the compaction. setStoragePolicyFromFileName(newFiles); - List sfs = moveCompactedFilesIntoPlace(cr, newFiles, user); + List sfs = this.storeEngine.compactor.commitCompaction(cr, newFiles, user); writeCompactionWalRecord(filesToCompact, sfs); replaceStoreFiles(filesToCompact, sfs); if (cr.isMajor()) { @@ -1549,21 +1555,6 @@ private void setStoragePolicyFromFileName(List newFiles) throws IOExceptio } } - private List moveCompactedFilesIntoPlace(CompactionRequestImpl cr, - List newFiles, User user) throws IOException { - List sfs = new ArrayList<>(newFiles.size()); - for (Path newFile : newFiles) { - assert newFile != null; - HStoreFile sf = moveFileIntoPlace(newFile); - if (this.getCoprocessorHost() != null) { - getCoprocessorHost().postCompact(this, sf, cr.getTracker(), cr, user); - } - assert sf != null; - sfs.add(sf); - } - return sfs; - } - // Package-visible for tests HStoreFile moveFileIntoPlace(Path newFile) throws IOException { validateStoreFile(newFile); @@ -2014,7 +2005,7 @@ protected void finishCompactionRequest(CompactionRequestImpl cr) { * operation. * @param path the path to the store file */ - private void validateStoreFile(Path path) throws IOException { + public void validateStoreFile(Path path) throws IOException { HStoreFile storeFile = null; try { storeFile = createStoreFileAndReader(path); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java index 6483111c6f62..d4db75c5a20e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java @@ -266,6 +266,7 @@ public InternalScanner createScanner(ScanInfo scanInfo, List s * @param fd The file details. * @return Writer for a new StoreFile in the tmp dir. * @throws IOException if creation failed + * @deprecated Use initWriter instead. */ protected final StoreFileWriter createTmpWriter(FileDetails fd, boolean shouldDropBehind, boolean major) throws IOException { @@ -278,6 +279,21 @@ protected final StoreFileWriter createTmpWriter(FileDetails fd, boolean shouldDr HConstants.EMPTY_STRING); } + /** + * Default method for initializing a StoreFileWriter in the compaction process, this creates the + * resulting files on a temp directory. Therefore, upon compaction commit time, these files + * should be renamed into the actual store dir. + * @param fd + * @param shouldDropBehind + * @param major + * @return Writer for a new StoreFile in the tmp dir. + * @throws IOException + */ + protected StoreFileWriter initWriter(FileDetails fd, boolean shouldDropBehind, boolean major) + throws IOException { + return this.createTmpWriter(fd, shouldDropBehind, major); + } + protected final StoreFileWriter createTmpWriter(FileDetails fd, boolean shouldDropBehind, String fileStoragePolicy, boolean major) throws IOException { return store.createWriterInTmp(fd.maxKeyCount, @@ -533,4 +549,46 @@ protected InternalScanner createScanner(HStore store, ScanInfo scanInfo, return new StoreScanner(store, scanInfo, scanners, smallestReadPoint, earliestPutTs, dropDeletesFromRow, dropDeletesToRow); } + + /** + * Default implementation for committing store files created after a compaction. Assumes new files + * had been created on a temp directory, so it renames those files into the actual store dir, + * then create a reader and cache it into the store. + * @param cr the compaction request. + * @param newFiles the new files created by this compaction under a temp dir. + * @param user the running user/ + * @return A list of the resulting store files already placed in the store dir and loaded into the + * store cache. + * @throws IOException + */ + public List commitCompaction(CompactionRequestImpl cr, List newFiles, User user) + throws IOException { + List sfs = new ArrayList<>(newFiles.size()); + for (Path newFile : newFiles) { + assert newFile != null; + this.store.validateStoreFile(newFile); + // Move the file into the right spot + HStoreFile sf = createFileInStoreDir(newFile); + if (this.store.getCoprocessorHost() != null) { + this.store.getCoprocessorHost().postCompact(this.store, sf, cr.getTracker(), cr, user); + } + assert sf != null; + sfs.add(sf); + } + return sfs; + } + + /** + * Assumes new file was created initially on a temp folder. + * Moves the new file from temp to the actual store directory, then create the related + * HStoreFile instance + * @param newFile the new file created. + * @return an HStoreFile instance. + * @throws IOException + */ + protected HStoreFile createFileInStoreDir(Path newFile) throws IOException { + Path destPath = this.store.getRegionFileSystem(). + commitStoreFile(this.store.getColumnFamilyName(), newFile); + return this.store.createStoreFileAndReader(destPath); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java index 839d067ecd14..108aea67d9cd 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java @@ -53,7 +53,7 @@ public DefaultCompactor(Configuration conf, HStore store) { public StoreFileWriter createWriter(InternalScanner scanner, org.apache.hadoop.hbase.regionserver.compactions.Compactor.FileDetails fd, boolean shouldDropBehind, boolean major) throws IOException { - return createTmpWriter(fd, shouldDropBehind, major); + return initWriter(fd, shouldDropBehind, major); } }; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DirectInStoreCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DirectInStoreCompactor.java new file mode 100644 index 000000000000..c606321216de --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DirectInStoreCompactor.java @@ -0,0 +1,78 @@ +package org.apache.hadoop.hbase.regionserver.compactions; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.io.compress.Compression; +import org.apache.hadoop.hbase.io.hfile.CacheConfig; +import org.apache.hadoop.hbase.io.hfile.HFileContext; +import org.apache.hadoop.hbase.regionserver.HStore; +import org.apache.hadoop.hbase.regionserver.HStoreFile; +import org.apache.hadoop.hbase.regionserver.StoreFileWriter; +import org.apache.hadoop.hbase.security.User; +import org.apache.yetus.audience.InterfaceAudience; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.function.Supplier; + +@InterfaceAudience.Private +public class DirectInStoreCompactor extends DefaultCompactor { + public DirectInStoreCompactor(Configuration conf, HStore store) { + super(conf, store); + } + + + @Override + protected StoreFileWriter initWriter(FileDetails fd, boolean shouldDropBehind, boolean major) + throws IOException { + // When all MVCC readpoints are 0, don't write them. + // See HBASE-8166, HBASE-12600, and HBASE-13389. + return createWriterInFamilyDir(fd.maxKeyCount, + major ? majorCompactionCompression : minorCompactionCompression, + fd.maxMVCCReadpoint > 0, fd.maxTagsLength > 0, shouldDropBehind); + } + + private StoreFileWriter createWriterInFamilyDir(long maxKeyCount, Compression.Algorithm compression, + boolean includeMVCCReadpoint, boolean includesTag, boolean shouldDropBehind) + throws IOException { + final CacheConfig writerCacheConf; + // Don't cache data on write on compactions. + writerCacheConf = new CacheConfig(store.getCacheConfig()); + writerCacheConf.setCacheDataOnWrite(false); + + InetSocketAddress[] favoredNodes = null; + if (store.getHRegion().getRegionServerServices() != null) { + favoredNodes = store.getHRegion().getRegionServerServices().getFavoredNodesForRegion( + store.getHRegion().getRegionInfo().getEncodedName()); + } + HFileContext hFileContext = store.createFileContext(compression, includeMVCCReadpoint, includesTag, + store.getCryptoContext()); + Path familyDir = new Path(store.getRegionFileSystem().getRegionDir(), + store.getColumnFamilyDescriptor().getNameAsString()); + StoreFileWriter.Builder builder = new StoreFileWriter.Builder(conf, writerCacheConf, + store.getFileSystem()) + .withOutputDir(familyDir) + .withBloomType(store.getColumnFamilyDescriptor().getBloomFilterType()) + .withMaxKeyCount(maxKeyCount) + .withFavoredNodes(favoredNodes) + .withFileContext(hFileContext) + .withShouldDropCacheBehind(shouldDropBehind) + .withCompactedFilesSupplier(() -> store.getCompactedFiles()); + return builder.build(); + } + + /** + * Overrides Compactor original implementation, assuming the passed file is already in the store + * directory, thus it only creates the related HStoreFile for the passed Path. + * @param newFile the new file created. + * @return + * @throws IOException + */ + @Override + protected HStoreFile createFileInStoreDir(Path newFile) throws IOException { + return this.store.createStoreFileAndReader(newFile); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPersistedStoreFlushContext.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDirectInStoreFlushContext.java similarity index 90% rename from hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPersistedStoreFlushContext.java rename to hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDirectInStoreFlushContext.java index 298116e69e9e..9f5c75b9c2ac 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPersistedStoreFlushContext.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDirectInStoreFlushContext.java @@ -26,7 +26,6 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseClassTestRule; -import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.RegionServerTests; @@ -38,14 +37,14 @@ import org.mockito.ArgumentCaptor; /** - * Test class for the TestPersistedStoreFlushContext + * Test class for DirectInStoreFlushContext */ @Category({ RegionServerTests.class, MediumTests.class }) -public class TestPersistedStoreFlushContext { +public class TestDirectInStoreFlushContext { @ClassRule public static final HBaseClassTestRule CLASS_RULE = - HBaseClassTestRule.forClass(TestPersistedStoreFlushContext.class); + HBaseClassTestRule.forClass(TestDirectInStoreFlushContext.class); @Rule public TestName name = new TestName(); @@ -63,7 +62,7 @@ public void testCommit() throws Exception { HStoreFile mockStoreFile = mock(HStoreFile.class); when(mockStoreFile.getReader()).thenReturn(mock(StoreFileReader.class)); when(mockStore.createStoreFileAndReader(captor.capture())).thenReturn(mockStoreFile); - PersistedStoreFlushContext context = new PersistedStoreFlushContext(mockStore, + DirectInStoreFlushContext context = new DirectInStoreFlushContext(mockStore, 0L, FlushLifeCycleTracker.DUMMY); context.tempFiles = new ArrayList<>(); context.tempFiles.add(filePath); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPersistedEngineStoreFlusher.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDirectInStoreFlusher.java similarity index 95% rename from hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPersistedEngineStoreFlusher.java rename to hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDirectInStoreFlusher.java index e1fb6d04dd55..7c4874868b93 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPersistedEngineStoreFlusher.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDirectInStoreFlusher.java @@ -53,14 +53,14 @@ import org.junit.rules.TestName; /** - * Test class for the TestPersistedStoreFlushContext + * Test class for DirectInStoreFlusher */ @Category({ RegionServerTests.class, MediumTests.class }) -public class TestPersistedEngineStoreFlusher { +public class TestDirectInStoreFlusher { @ClassRule public static final HBaseClassTestRule CLASS_RULE = - HBaseClassTestRule.forClass(TestPersistedEngineStoreFlusher.class); + HBaseClassTestRule.forClass(TestDirectInStoreFlusher.class); @Rule public TestName name = new TestName(); @@ -115,7 +115,7 @@ public void setup() throws Exception { @Test public void testCreateWriter() throws Exception { - PersistedEngineStoreFlusher flusher = new PersistedEngineStoreFlusher(config, mockStore); + DirectInStoreFlusher flusher = new DirectInStoreFlusher(config, mockStore); List files = flusher.flushSnapshot(mockSnapshot, 0, mock(MonitoredTask.class), null, FlushLifeCycleTracker.DUMMY); assertEquals(1, files.size()); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java index 20d7a29898ac..4811d422655f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.regionserver; +import static org.apache.hadoop.hbase.regionserver.DefaultStoreEngine.DEFAULT_COMPACTOR_CLASS_KEY; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -92,6 +93,7 @@ import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.quotas.RegionSizeStoreImpl; import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl; import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor; import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher; import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController; @@ -115,6 +117,7 @@ import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.rules.TestName; +import org.mockito.ArgumentCaptor; import org.mockito.Mockito; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -1731,6 +1734,33 @@ public void testHFileContextSetWithCFAndTable() throws Exception { assertArrayEquals(table, hFileContext.getTableName()); } + @Test + public void testDoCompactionDelegatesCommit() throws Exception { + final Configuration conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration()); + conf.set(DEFAULT_COMPACTOR_CLASS_KEY, DummyCompactor.class.getName()); + HFileContext fileContext = new HFileContextBuilder().withBlockSize(BLOCKSIZE_SMALL).build(); + store = init(this.name.getMethodName(), conf); + Path storeDir =this.store.getStoreContext().getRegionFileSystem().getStoreDir("family"); + StoreFileWriter w = new StoreFileWriter.Builder(conf, new CacheConfig(conf), + store.getFileSystem()) + .withOutputDir(storeDir) + .withFileContext(fileContext) + .build(); + w.appendMetadata(1, false); + w.close(); + HStoreFile mockStoreFile = mock(HStoreFile.class); + List mockFiles = new ArrayList<>(); + mockFiles.add(mockStoreFile); + List files = new ArrayList<>(); + files.add(w.getPath()); + DummyCompactor.countDownLatch = new CountDownLatch(1); + try { + store.doCompaction(mock(CompactionRequestImpl.class), + null, mock(User.class), 0, files); + } catch(Throwable e){} + assertEquals(0, DummyCompactor.countDownLatch.getCount()); + } + private HStoreFile mockStoreFileWithLength(long length) { HStoreFile sf = mock(HStoreFile.class); StoreFileReader sfr = mock(StoreFileReader.class); @@ -1930,4 +1960,21 @@ public boolean add(T e) { @Override public List subList(int fromIndex, int toIndex) {return delegatee.subList(fromIndex, toIndex);} } + + private static class DummyCompactor extends DefaultCompactor { + + static CountDownLatch countDownLatch; + + public DummyCompactor(Configuration conf, HStore store) { + super(conf, store); + } + + @Override + public List commitCompaction(CompactionRequestImpl cr, + List newFiles, User user){ + countDownLatch.countDown(); + return null; + } + + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestDefaultCompactor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestDefaultCompactor.java new file mode 100644 index 000000000000..64a0f464ca5b --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestDefaultCompactor.java @@ -0,0 +1,121 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.compactions; + +import static junit.framework.TestCase.assertEquals; +import static org.mockito.Mockito.mock; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.regionserver.HStore; +import org.apache.hadoop.hbase.regionserver.HStoreFile; +import org.apache.hadoop.hbase.regionserver.StoreFileWriter; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +/** + * Test class for DirectInStoreCompactor. + */ +@Category({ RegionServerTests.class, MediumTests.class }) +public class TestDefaultCompactor { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestDefaultCompactor.class); + + @Rule + public TestName name = new TestName(); + + private final Configuration config = new Configuration(); + private HStore store; + private final String cfName = "cf"; + private Compactor.FileDetails mockFileDetails; + + private HBaseTestingUtility UTIL = new HBaseTestingUtility(); + private TableName table; + + @Before + public void setup() throws Exception { + UTIL.startMiniCluster(); + table = TableName.valueOf(name.getMethodName()); + UTIL.createTable(table, Bytes.toBytes(cfName)); + + } + + @After + public void shutdown() throws IOException { + UTIL.shutdownMiniCluster(); + } + + @Test + public void testInitWriter() throws Exception { + store = UTIL.getMiniHBaseCluster().getRegionServer(0). + getRegions(table).get(0).getStores().get(0); + DefaultCompactor compactor = new DefaultCompactor(config, store); + mockFileDetails = mock(Compactor.FileDetails.class); + StoreFileWriter writer = compactor.initWriter(mockFileDetails, false, false); + Path tmpPath = new Path(store.getRegionFileSystem().getRegionDir(), ".tmp"); + assertEquals(new Path(tmpPath, cfName), writer.getPath().getParent()); + } + + @Test + public void testCommitCompaction() throws Exception { + //Performs a put, then flush to create a valid store file + Table tbl = UTIL.getConnection().getTable(table); + Put put = new Put(Bytes.toBytes("row1")); + put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("1"), Bytes.toBytes("v1")); + tbl.put(put); + UTIL.flush(table); + store = UTIL.getMiniHBaseCluster().getRegionServer(0). + getRegions(table).get(0).getStores().get(0); + //Will move the existing file into a tmp folder, + // so that we can use it later as parameter for Compactor.commitCompaction + Path filePath = null; + List tmpFilesList = new ArrayList<>(); + for(HStoreFile file : store.getStorefiles()){ + filePath = file.getPath(); + Path tmpPath = new Path(store.getRegionFileSystem().getRegionDir(), ".tmp"); + tmpPath = new Path(tmpPath, filePath.getName()); + store.getFileSystem().rename(filePath, tmpPath); + tmpFilesList.add(tmpPath); + break; + } + DefaultCompactor compactor = new DefaultCompactor(config, store); + //pass the renamed original file, then asserts it has the proper store dir path + List result = compactor.commitCompaction(mock(CompactionRequestImpl.class), tmpFilesList, null); + assertEquals(1, result.size()); + assertEquals(filePath, result.get(0).getPath()); + } + +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestDirectInStoreCompactor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestDirectInStoreCompactor.java new file mode 100644 index 000000000000..a6654ab2ab59 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestDirectInStoreCompactor.java @@ -0,0 +1,132 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.compactions; + +import static junit.framework.TestCase.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.ArgumentMatchers.isNull; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hbase.CellComparator; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.io.ByteBuffAllocator; +import org.apache.hadoop.hbase.io.compress.Compression; +import org.apache.hadoop.hbase.io.crypto.Encryption; +import org.apache.hadoop.hbase.io.hfile.CacheConfig; +import org.apache.hadoop.hbase.io.hfile.HFileContext; +import org.apache.hadoop.hbase.regionserver.BloomType; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.HRegionFileSystem; +import org.apache.hadoop.hbase.regionserver.HStore; +import org.apache.hadoop.hbase.regionserver.HStoreFile; +import org.apache.hadoop.hbase.regionserver.StoreFileWriter; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.util.ChecksumType; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; +import org.mockito.ArgumentCaptor; + +/** + * Test class for DirectInStoreCompactor. + */ +@Category({ RegionServerTests.class, MediumTests.class }) +public class TestDirectInStoreCompactor { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestDirectInStoreCompactor.class); + + @Rule + public TestName name = new TestName(); + + private Configuration config = new Configuration(); + private HStore mockStore; + private String cfName = name.getMethodName()+"-CF"; + private Compactor.FileDetails mockFileDetails; + + @Before + public void setup() throws Exception { + Path filePath = new Path(name.getMethodName()); + mockStore = mock(HStore.class); + HRegionFileSystem mockRegionFS = mock(HRegionFileSystem.class); + when(mockStore.getRegionFileSystem()).thenReturn(mockRegionFS); + when(mockRegionFS.getRegionDir()).thenReturn(filePath); + when(mockStore.getColumnFamilyName()).thenReturn(cfName); + HFileContext mockFileContext = mock(HFileContext.class); + when(mockFileContext.getBytesPerChecksum()).thenReturn(100); + when(mockStore.createFileContext(isNull(), anyBoolean(), + anyBoolean(), isNull())).thenReturn(mockFileContext); + when(mockStore.getHRegion()).thenReturn(mock(HRegion.class)); + ColumnFamilyDescriptor mockDesc = mock(ColumnFamilyDescriptor.class); + when(mockDesc.getBloomFilterType()).thenReturn(BloomType.NONE); + when(mockDesc.getNameAsString()).thenReturn(cfName); + when(mockStore.getColumnFamilyDescriptor()).thenReturn(mockDesc); + FileSystem mockFS = mock(FileSystem.class); + when(mockFS.exists(any(Path.class))).thenReturn(true); + FileStatus mockFileStatus = mock(FileStatus.class); + when(mockFileStatus.isDirectory()).thenReturn(true); + when(mockFS.getFileStatus(any(Path.class))).thenReturn(mockFileStatus); + when(mockStore.getFileSystem()).thenReturn(mockFS); + when(mockFS.getConf()).thenReturn(config); + when(mockFS.create(any(Path.class), any(FsPermission.class), any(Boolean.class), + any(Integer.class), any(Short.class), any(Long.class), any())) + .thenReturn(mock(FSDataOutputStream.class)); + CacheConfig mockCacheConfig = mock(CacheConfig.class); + when(mockCacheConfig.getByteBuffAllocator()).thenReturn(mock(ByteBuffAllocator.class)); + when(mockStore.getCacheConfig()).thenReturn(mockCacheConfig); + when(mockFileContext.getEncryptionContext()).thenReturn(Encryption.Context.NONE); + when(mockFileContext.getCompression()).thenReturn(Compression.Algorithm.NONE); + when(mockFileContext.getChecksumType()).thenReturn(ChecksumType.NULL); + when(mockFileContext.getCellComparator()).thenReturn(mock(CellComparator.class)); + when(mockStore.getRegionInfo()).thenReturn(mock(RegionInfo.class)); + this.mockFileDetails = mock(Compactor.FileDetails.class); + } + + @Test + public void testInitWriter() throws Exception { + DirectInStoreCompactor compactor = new DirectInStoreCompactor(config, mockStore); + StoreFileWriter writer = compactor.initWriter(mockFileDetails, false, false); + Path filePath = new Path(name.getMethodName()); + assertEquals(new Path(filePath, cfName), writer.getPath().getParent()); + } + + @Test + public void testCreateFileInStoreDir() throws Exception { + HStoreFile mockFile = mock(HStoreFile.class); + ArgumentCaptor pathCaptor = ArgumentCaptor.forClass(Path.class); + when(mockStore.createStoreFileAndReader(pathCaptor.capture())).thenReturn(mockFile); + DirectInStoreCompactor compactor = new DirectInStoreCompactor(config, mockStore); + StoreFileWriter writer = compactor.initWriter(mockFileDetails, false, false); + compactor.createFileInStoreDir(writer.getPath()); + assertEquals(writer.getPath(), pathCaptor.getValue()); + } +} From bbb851efe1ede55569d5a0edfc3eb846f6ed8b65 Mon Sep 17 00:00:00 2001 From: Wellington Chevreuil Date: Tue, 15 Jun 2021 16:38:44 +0100 Subject: [PATCH 02/12] checkstyles Change-Id: Iff62fd5d402429115b743dd22293c357cbf1a29e --- .../regionserver/compactions/Compactor.java | 17 ++++---- .../compactions/DefaultCompactor.java | 3 +- .../compactions/DirectInStoreCompactor.java | 40 ++++++++++++------- .../hadoop/hbase/regionserver/TestHStore.java | 1 - .../compactions/TestDefaultCompactor.java | 12 +++--- 5 files changed, 43 insertions(+), 30 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java index d4db75c5a20e..ec47430c86dd 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java @@ -268,6 +268,7 @@ public InternalScanner createScanner(ScanInfo scanInfo, List s * @throws IOException if creation failed * @deprecated Use initWriter instead. */ + @Deprecated protected final StoreFileWriter createTmpWriter(FileDetails fd, boolean shouldDropBehind, boolean major) throws IOException { // When all MVCC readpoints are 0, don't write them. @@ -283,11 +284,11 @@ protected final StoreFileWriter createTmpWriter(FileDetails fd, boolean shouldDr * Default method for initializing a StoreFileWriter in the compaction process, this creates the * resulting files on a temp directory. Therefore, upon compaction commit time, these files * should be renamed into the actual store dir. - * @param fd - * @param shouldDropBehind - * @param major + * @param fd the file details. + * @param shouldDropBehind boolean for the drop-behind output stream cache settings. + * @param major if compaction is major. * @return Writer for a new StoreFile in the tmp dir. - * @throws IOException + * @throws IOException if it fails to initialise the writer. */ protected StoreFileWriter initWriter(FileDetails fd, boolean shouldDropBehind, boolean major) throws IOException { @@ -556,10 +557,10 @@ protected InternalScanner createScanner(HStore store, ScanInfo scanInfo, * then create a reader and cache it into the store. * @param cr the compaction request. * @param newFiles the new files created by this compaction under a temp dir. - * @param user the running user/ + * @param user the running user. * @return A list of the resulting store files already placed in the store dir and loaded into the - * store cache. - * @throws IOException + * store cache. + * @throws IOException if the commit fails. */ public List commitCompaction(CompactionRequestImpl cr, List newFiles, User user) throws IOException { @@ -584,7 +585,7 @@ public List commitCompaction(CompactionRequestImpl cr, List ne * HStoreFile instance * @param newFile the new file created. * @return an HStoreFile instance. - * @throws IOException + * @throws IOException if the file store creation fails. */ protected HStoreFile createFileInStoreDir(Path newFile) throws IOException { Path destPath = this.store.getRegionFileSystem(). diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java index 108aea67d9cd..bd0bd0eb9b0c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java @@ -50,8 +50,7 @@ public DefaultCompactor(Configuration conf, HStore store) { private final CellSinkFactory writerFactory = new CellSinkFactory() { @Override - public StoreFileWriter createWriter(InternalScanner scanner, - org.apache.hadoop.hbase.regionserver.compactions.Compactor.FileDetails fd, + public StoreFileWriter createWriter(InternalScanner scanner, FileDetails fd, boolean shouldDropBehind, boolean major) throws IOException { return initWriter(fd, shouldDropBehind, major); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DirectInStoreCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DirectInStoreCompactor.java index c606321216de..5e1e77581784 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DirectInStoreCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DirectInStoreCompactor.java @@ -1,5 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.hadoop.hbase.regionserver.compactions; +import java.io.IOException; +import java.net.InetSocketAddress; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.io.compress.Compression; @@ -8,23 +29,14 @@ import org.apache.hadoop.hbase.regionserver.HStore; import org.apache.hadoop.hbase.regionserver.HStoreFile; import org.apache.hadoop.hbase.regionserver.StoreFileWriter; -import org.apache.hadoop.hbase.security.User; import org.apache.yetus.audience.InterfaceAudience; -import java.io.IOException; -import java.net.InetSocketAddress; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.function.Supplier; - @InterfaceAudience.Private public class DirectInStoreCompactor extends DefaultCompactor { public DirectInStoreCompactor(Configuration conf, HStore store) { super(conf, store); } - @Override protected StoreFileWriter initWriter(FileDetails fd, boolean shouldDropBehind, boolean major) throws IOException { @@ -35,9 +47,9 @@ protected StoreFileWriter initWriter(FileDetails fd, boolean shouldDropBehind, b fd.maxMVCCReadpoint > 0, fd.maxTagsLength > 0, shouldDropBehind); } - private StoreFileWriter createWriterInFamilyDir(long maxKeyCount, Compression.Algorithm compression, - boolean includeMVCCReadpoint, boolean includesTag, boolean shouldDropBehind) - throws IOException { + private StoreFileWriter createWriterInFamilyDir(long maxKeyCount, + Compression.Algorithm compression, boolean includeMVCCReadpoint, boolean includesTag, + boolean shouldDropBehind) throws IOException { final CacheConfig writerCacheConf; // Don't cache data on write on compactions. writerCacheConf = new CacheConfig(store.getCacheConfig()); @@ -48,8 +60,8 @@ private StoreFileWriter createWriterInFamilyDir(long maxKeyCount, Compression.Al favoredNodes = store.getHRegion().getRegionServerServices().getFavoredNodesForRegion( store.getHRegion().getRegionInfo().getEncodedName()); } - HFileContext hFileContext = store.createFileContext(compression, includeMVCCReadpoint, includesTag, - store.getCryptoContext()); + HFileContext hFileContext = store.createFileContext(compression, includeMVCCReadpoint, + includesTag, store.getCryptoContext()); Path familyDir = new Path(store.getRegionFileSystem().getRegionDir(), store.getColumnFamilyDescriptor().getNameAsString()); StoreFileWriter.Builder builder = new StoreFileWriter.Builder(conf, writerCacheConf, diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java index 4811d422655f..f7b1c062a158 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java @@ -117,7 +117,6 @@ import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.rules.TestName; -import org.mockito.ArgumentCaptor; import org.mockito.Mockito; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestDefaultCompactor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestDefaultCompactor.java index 64a0f464ca5b..63e7c353761e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestDefaultCompactor.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestDefaultCompactor.java @@ -19,6 +19,11 @@ import static junit.framework.TestCase.assertEquals; import static org.mockito.Mockito.mock; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseClassTestRule; @@ -40,10 +45,6 @@ import org.junit.experimental.categories.Category; import org.junit.rules.TestName; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; - /** * Test class for DirectInStoreCompactor. */ @@ -113,7 +114,8 @@ public void testCommitCompaction() throws Exception { } DefaultCompactor compactor = new DefaultCompactor(config, store); //pass the renamed original file, then asserts it has the proper store dir path - List result = compactor.commitCompaction(mock(CompactionRequestImpl.class), tmpFilesList, null); + List result = compactor.commitCompaction(mock(CompactionRequestImpl.class), + tmpFilesList, null); assertEquals(1, result.size()); assertEquals(filePath, result.get(0).getPath()); } From a0174221736b921ed138fab13f4337211fbe758c Mon Sep 17 00:00:00 2001 From: Wellington Chevreuil Date: Mon, 21 Jun 2021 20:46:02 +0100 Subject: [PATCH 03/12] addressing review comments Change-Id: I5ac86a27c87e956926860ad9fb3eaeed3be75d00 --- .../hbase/regionserver/compactions/TestDefaultCompactor.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestDefaultCompactor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestDefaultCompactor.java index 63e7c353761e..21873d73a8a8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestDefaultCompactor.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestDefaultCompactor.java @@ -46,7 +46,7 @@ import org.junit.rules.TestName; /** - * Test class for DirectInStoreCompactor. + * Test class for DefaultCompactor. */ @Category({ RegionServerTests.class, MediumTests.class }) public class TestDefaultCompactor { From 4f7aba73d833af46c7ddec5be4f91ac285e64cd2 Mon Sep 17 00:00:00 2001 From: Wellington Chevreuil Date: Tue, 22 Jun 2021 14:42:27 +0100 Subject: [PATCH 04/12] Addressing review comments from Z York Change-Id: I7a79666b8856715b1ad4421162d2b4658c213f4d --- .../hadoop/hbase/io/hfile/CacheConfig.java | 24 +++++++++++++++++++ ...text.java => DirectStoreFlushContext.java} | 8 +++---- ...reFlusher.java => DirectStoreFlusher.java} | 6 ++--- .../hadoop/hbase/regionserver/HStore.java | 5 +--- .../regionserver/compactions/Compactor.java | 2 +- ...mpactor.java => DirectStoreCompactor.java} | 12 +++++----- ....java => TestDirectStoreFlushContext.java} | 8 +++---- ...usher.java => TestDirectStoreFlusher.java} | 8 +++---- ...tor.java => TestDirectStoreCompactor.java} | 10 ++++---- 9 files changed, 52 insertions(+), 31 deletions(-) rename hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/{DirectInStoreFlushContext.java => DirectStoreFlushContext.java} (89%) rename hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/{DirectInStoreFlusher.java => DirectStoreFlusher.java} (95%) rename hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/{DirectInStoreCompactor.java => DirectStoreCompactor.java} (90%) rename hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/{TestDirectInStoreFlushContext.java => TestDirectStoreFlushContext.java} (92%) rename hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/{TestDirectInStoreFlusher.java => TestDirectStoreFlusher.java} (96%) rename hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/{TestDirectInStoreCompactor.java => TestDirectStoreCompactor.java} (94%) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java index dcbb71582f44..ff65f5113dbd 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java @@ -297,6 +297,30 @@ public void enableCacheOnWrite() { this.cacheBloomsOnWrite = true; } + /** + * If hbase.rs.cachecompactedblocksonwrite configuration is set to true and + * 'totalCompactedFilesSize' is lower than 'cacheCompactedDataOnWriteThreshold', + * enables cache on write for below properties: + * - cacheDataOnWrite + * - cacheIndexesOnWrite + * - cacheBloomsOnWrite + * + * Otherwise, sets 'cacheDataOnWrite' only to false. + * + * @param totalCompactedFilesSize the total size of compacted files. + * @return true if the checks mentioned above pass and the cache is enabled, false otherwise. + */ + public boolean enableCacheOnWrite(long totalCompactedFilesSize) { + if (shouldCacheCompactedBlocksOnWrite() && totalCompactedFilesSize <= + getCacheCompactedBlocksOnWriteThreshold()) { + enableCacheOnWrite(); + return true; + } else { + setCacheDataOnWrite(false); + return false; + } + } + /** * @return true if index blocks should be written to the cache when an HFile * is written, false if not diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DirectInStoreFlushContext.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DirectStoreFlushContext.java similarity index 89% rename from hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DirectInStoreFlushContext.java rename to hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DirectStoreFlushContext.java index 874996bba84d..63d5e9cab979 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DirectInStoreFlushContext.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DirectStoreFlushContext.java @@ -30,14 +30,14 @@ * directly in the store dir, and therefore, doesn't perform a rename from tmp dir * into the store dir. * - * To be used only when PersistedStoreEngine is configured as the StoreEngine implementation. + * To be used only when DirectStoreFlushContext is configured as the StoreEngine implementation. */ @InterfaceAudience.Private -public class DirectInStoreFlushContext extends DefaultStoreFlushContext { +public class DirectStoreFlushContext extends DefaultStoreFlushContext { - private static final Logger LOG = LoggerFactory.getLogger(DirectInStoreFlushContext.class); + private static final Logger LOG = LoggerFactory.getLogger(DirectStoreFlushContext.class); - public DirectInStoreFlushContext(HStore store, Long cacheFlushSeqNum, + public DirectStoreFlushContext(HStore store, Long cacheFlushSeqNum, FlushLifeCycleTracker tracker) { super.init(store, cacheFlushSeqNum, tracker); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DirectInStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DirectStoreFlusher.java similarity index 95% rename from hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DirectInStoreFlusher.java rename to hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DirectStoreFlusher.java index ba52c7518fbb..a91fb42829f2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DirectInStoreFlusher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DirectStoreFlusher.java @@ -33,12 +33,12 @@ /** * A StoreFlusher that writes hfiles directly into the actual store directory, - * instead of a temp dir.. + * instead of a temp dir. */ @InterfaceAudience.Private -public class DirectInStoreFlusher extends DefaultStoreFlusher { +public class DirectStoreFlusher extends DefaultStoreFlusher { - public DirectInStoreFlusher(Configuration conf, HStore store) { + public DirectStoreFlusher(Configuration conf, HStore store) { super(conf, store); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index e55f03cc200b..029072de3386 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -1151,16 +1151,13 @@ public StoreFileWriter createWriterInTmp(long maxKeyCount, Compression.Algorithm // if data blocks are to be cached on write // during compaction, we should forcefully // cache index and bloom blocks as well - if (cacheCompactedBlocksOnWrite && totalCompactedFilesSize <= cacheConf - .getCacheCompactedBlocksOnWriteThreshold()) { - writerCacheConf.enableCacheOnWrite(); + if (writerCacheConf.enableCacheOnWrite(totalCompactedFilesSize)) { if (!cacheOnWriteLogged) { LOG.info("For {} , cacheCompactedBlocksOnWrite is true, hence enabled " + "cacheOnWrite for Data blocks, Index blocks and Bloom filter blocks", this); cacheOnWriteLogged = true; } } else { - writerCacheConf.setCacheDataOnWrite(false); if (totalCompactedFilesSize > cacheConf.getCacheCompactedBlocksOnWriteThreshold()) { // checking condition once again for logging LOG.debug( diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java index ec47430c86dd..3b0cf10f909e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java @@ -135,7 +135,7 @@ protected static class FileDetails { /** Min SeqId to keep during a major compaction **/ public long minSeqIdToKeep = 0; /** Total size of the compacted files **/ - private long totalCompactedFilesSize = 0; + public long totalCompactedFilesSize = 0; } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DirectInStoreCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DirectStoreCompactor.java similarity index 90% rename from hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DirectInStoreCompactor.java rename to hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DirectStoreCompactor.java index 5e1e77581784..6d491d88c471 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DirectInStoreCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DirectStoreCompactor.java @@ -32,8 +32,8 @@ import org.apache.yetus.audience.InterfaceAudience; @InterfaceAudience.Private -public class DirectInStoreCompactor extends DefaultCompactor { - public DirectInStoreCompactor(Configuration conf, HStore store) { +public class DirectStoreCompactor extends DefaultCompactor { + public DirectStoreCompactor(Configuration conf, HStore store) { super(conf, store); } @@ -44,17 +44,17 @@ protected StoreFileWriter initWriter(FileDetails fd, boolean shouldDropBehind, b // See HBASE-8166, HBASE-12600, and HBASE-13389. return createWriterInFamilyDir(fd.maxKeyCount, major ? majorCompactionCompression : minorCompactionCompression, - fd.maxMVCCReadpoint > 0, fd.maxTagsLength > 0, shouldDropBehind); + fd.maxMVCCReadpoint > 0, fd.maxTagsLength > 0, + shouldDropBehind, fd.totalCompactedFilesSize); } private StoreFileWriter createWriterInFamilyDir(long maxKeyCount, Compression.Algorithm compression, boolean includeMVCCReadpoint, boolean includesTag, - boolean shouldDropBehind) throws IOException { + boolean shouldDropBehind, long totalCompactedFilesSize) throws IOException { final CacheConfig writerCacheConf; // Don't cache data on write on compactions. writerCacheConf = new CacheConfig(store.getCacheConfig()); - writerCacheConf.setCacheDataOnWrite(false); - + writerCacheConf.enableCacheOnWrite(totalCompactedFilesSize); InetSocketAddress[] favoredNodes = null; if (store.getHRegion().getRegionServerServices() != null) { favoredNodes = store.getHRegion().getRegionServerServices().getFavoredNodesForRegion( diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDirectInStoreFlushContext.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDirectStoreFlushContext.java similarity index 92% rename from hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDirectInStoreFlushContext.java rename to hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDirectStoreFlushContext.java index 9f5c75b9c2ac..b4b22263d3d3 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDirectInStoreFlushContext.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDirectStoreFlushContext.java @@ -37,14 +37,14 @@ import org.mockito.ArgumentCaptor; /** - * Test class for DirectInStoreFlushContext + * Test class for DirectStoreFlushContext */ @Category({ RegionServerTests.class, MediumTests.class }) -public class TestDirectInStoreFlushContext { +public class TestDirectStoreFlushContext { @ClassRule public static final HBaseClassTestRule CLASS_RULE = - HBaseClassTestRule.forClass(TestDirectInStoreFlushContext.class); + HBaseClassTestRule.forClass(TestDirectStoreFlushContext.class); @Rule public TestName name = new TestName(); @@ -62,7 +62,7 @@ public void testCommit() throws Exception { HStoreFile mockStoreFile = mock(HStoreFile.class); when(mockStoreFile.getReader()).thenReturn(mock(StoreFileReader.class)); when(mockStore.createStoreFileAndReader(captor.capture())).thenReturn(mockStoreFile); - DirectInStoreFlushContext context = new DirectInStoreFlushContext(mockStore, + DirectStoreFlushContext context = new DirectStoreFlushContext(mockStore, 0L, FlushLifeCycleTracker.DUMMY); context.tempFiles = new ArrayList<>(); context.tempFiles.add(filePath); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDirectInStoreFlusher.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDirectStoreFlusher.java similarity index 96% rename from hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDirectInStoreFlusher.java rename to hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDirectStoreFlusher.java index 7c4874868b93..0d7cb789d5bf 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDirectInStoreFlusher.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDirectStoreFlusher.java @@ -53,14 +53,14 @@ import org.junit.rules.TestName; /** - * Test class for DirectInStoreFlusher + * Test class for DirectStoreFlusher */ @Category({ RegionServerTests.class, MediumTests.class }) -public class TestDirectInStoreFlusher { +public class TestDirectStoreFlusher { @ClassRule public static final HBaseClassTestRule CLASS_RULE = - HBaseClassTestRule.forClass(TestDirectInStoreFlusher.class); + HBaseClassTestRule.forClass(TestDirectStoreFlusher.class); @Rule public TestName name = new TestName(); @@ -115,7 +115,7 @@ public void setup() throws Exception { @Test public void testCreateWriter() throws Exception { - DirectInStoreFlusher flusher = new DirectInStoreFlusher(config, mockStore); + DirectStoreFlusher flusher = new DirectStoreFlusher(config, mockStore); List files = flusher.flushSnapshot(mockSnapshot, 0, mock(MonitoredTask.class), null, FlushLifeCycleTracker.DUMMY); assertEquals(1, files.size()); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestDirectInStoreCompactor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestDirectStoreCompactor.java similarity index 94% rename from hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestDirectInStoreCompactor.java rename to hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestDirectStoreCompactor.java index a6654ab2ab59..fa4b1258d4d1 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestDirectInStoreCompactor.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestDirectStoreCompactor.java @@ -56,14 +56,14 @@ import org.mockito.ArgumentCaptor; /** - * Test class for DirectInStoreCompactor. + * Test class for DirectStoreCompactor. */ @Category({ RegionServerTests.class, MediumTests.class }) -public class TestDirectInStoreCompactor { +public class TestDirectStoreCompactor { @ClassRule public static final HBaseClassTestRule CLASS_RULE = - HBaseClassTestRule.forClass(TestDirectInStoreCompactor.class); + HBaseClassTestRule.forClass(TestDirectStoreCompactor.class); @Rule public TestName name = new TestName(); @@ -113,7 +113,7 @@ public void setup() throws Exception { @Test public void testInitWriter() throws Exception { - DirectInStoreCompactor compactor = new DirectInStoreCompactor(config, mockStore); + DirectStoreCompactor compactor = new DirectStoreCompactor(config, mockStore); StoreFileWriter writer = compactor.initWriter(mockFileDetails, false, false); Path filePath = new Path(name.getMethodName()); assertEquals(new Path(filePath, cfName), writer.getPath().getParent()); @@ -124,7 +124,7 @@ public void testCreateFileInStoreDir() throws Exception { HStoreFile mockFile = mock(HStoreFile.class); ArgumentCaptor pathCaptor = ArgumentCaptor.forClass(Path.class); when(mockStore.createStoreFileAndReader(pathCaptor.capture())).thenReturn(mockFile); - DirectInStoreCompactor compactor = new DirectInStoreCompactor(config, mockStore); + DirectStoreCompactor compactor = new DirectStoreCompactor(config, mockStore); StoreFileWriter writer = compactor.initWriter(mockFileDetails, false, false); compactor.createFileInStoreDir(writer.getPath()); assertEquals(writer.getPath(), pathCaptor.getValue()); From 440b66a70dd443e1a266753079dab914fe9171cc Mon Sep 17 00:00:00 2001 From: Wellington Chevreuil Date: Wed, 23 Jun 2021 22:45:38 +0100 Subject: [PATCH 05/12] More review suggestions Change-Id: I26d10e3cb52a14bc82f13647ff9921c92282a353 --- .../apache/hadoop/hbase/io/hfile/CacheConfig.java | 6 +++--- .../apache/hadoop/hbase/regionserver/HStore.java | 6 +++--- .../hbase/regionserver/compactions/Compactor.java | 2 +- .../compactions/DirectStoreCompactor.java | 13 ++++--------- 4 files changed, 11 insertions(+), 16 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java index ff65f5113dbd..e250a522a3e8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java @@ -291,7 +291,7 @@ public void setCacheDataOnWrite(boolean cacheDataOnWrite) { * cacheIndexesOnWrite * cacheBloomsOnWrite */ - public void enableCacheOnWrite() { + public void enableCacheOnWriteForCompactions() { this.cacheDataOnWrite = true; this.cacheIndexesOnWrite = true; this.cacheBloomsOnWrite = true; @@ -310,10 +310,10 @@ public void enableCacheOnWrite() { * @param totalCompactedFilesSize the total size of compacted files. * @return true if the checks mentioned above pass and the cache is enabled, false otherwise. */ - public boolean enableCacheOnWrite(long totalCompactedFilesSize) { + public boolean enableCacheOnWriteForCompactions(long totalCompactedFilesSize) { if (shouldCacheCompactedBlocksOnWrite() && totalCompactedFilesSize <= getCacheCompactedBlocksOnWriteThreshold()) { - enableCacheOnWrite(); + enableCacheOnWriteForCompactions(); return true; } else { setCacheDataOnWrite(false); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index 029072de3386..d81a2eab848d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -437,7 +437,7 @@ public static long determineTTLFromFamily(final ColumnFamilyDescriptor family) { return ttl; } - StoreContext getStoreContext() { + public StoreContext getStoreContext() { return storeContext; } @@ -1151,7 +1151,7 @@ public StoreFileWriter createWriterInTmp(long maxKeyCount, Compression.Algorithm // if data blocks are to be cached on write // during compaction, we should forcefully // cache index and bloom blocks as well - if (writerCacheConf.enableCacheOnWrite(totalCompactedFilesSize)) { + if (writerCacheConf.enableCacheOnWriteForCompactions(totalCompactedFilesSize)) { if (!cacheOnWriteLogged) { LOG.info("For {} , cacheCompactedBlocksOnWrite is true, hence enabled " + "cacheOnWrite for Data blocks, Index blocks and Bloom filter blocks", this); @@ -1170,7 +1170,7 @@ public StoreFileWriter createWriterInTmp(long maxKeyCount, Compression.Algorithm } else { final boolean shouldCacheDataOnWrite = cacheConf.shouldCacheDataOnWrite(); if (shouldCacheDataOnWrite) { - writerCacheConf.enableCacheOnWrite(); + writerCacheConf.enableCacheOnWriteForCompactions(); if (!cacheOnWriteLogged) { LOG.info("For {} , cacheDataOnWrite is true, hence enabled cacheOnWrite for " + "Index blocks and Bloom filter blocks", this); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java index 3b0cf10f909e..ea93c512f075 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java @@ -559,7 +559,7 @@ protected InternalScanner createScanner(HStore store, ScanInfo scanInfo, * @param newFiles the new files created by this compaction under a temp dir. * @param user the running user. * @return A list of the resulting store files already placed in the store dir and loaded into the - * store cache. + * store cache. * @throws IOException if the commit fails. */ public List commitCompaction(CompactionRequestImpl cr, List newFiles, User user) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DirectStoreCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DirectStoreCompactor.java index 6d491d88c471..00257ead0417 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DirectStoreCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DirectStoreCompactor.java @@ -52,14 +52,9 @@ private StoreFileWriter createWriterInFamilyDir(long maxKeyCount, Compression.Algorithm compression, boolean includeMVCCReadpoint, boolean includesTag, boolean shouldDropBehind, long totalCompactedFilesSize) throws IOException { final CacheConfig writerCacheConf; - // Don't cache data on write on compactions. writerCacheConf = new CacheConfig(store.getCacheConfig()); - writerCacheConf.enableCacheOnWrite(totalCompactedFilesSize); - InetSocketAddress[] favoredNodes = null; - if (store.getHRegion().getRegionServerServices() != null) { - favoredNodes = store.getHRegion().getRegionServerServices().getFavoredNodesForRegion( - store.getHRegion().getRegionInfo().getEncodedName()); - } + writerCacheConf.enableCacheOnWriteForCompactions(totalCompactedFilesSize); + InetSocketAddress[] favoredNodes = store.getStoreContext().getFavoredNodes(); HFileContext hFileContext = store.createFileContext(compression, includeMVCCReadpoint, includesTag, store.getCryptoContext()); Path familyDir = new Path(store.getRegionFileSystem().getRegionDir(), @@ -80,8 +75,8 @@ private StoreFileWriter createWriterInFamilyDir(long maxKeyCount, * Overrides Compactor original implementation, assuming the passed file is already in the store * directory, thus it only creates the related HStoreFile for the passed Path. * @param newFile the new file created. - * @return - * @throws IOException + * @return HStoreFile reference for the newly created file. + * @throws IOException if any error occurs. */ @Override protected HStoreFile createFileInStoreDir(Path newFile) throws IOException { From 10586d845815a15393332e6dad775fb538fcd254 Mon Sep 17 00:00:00 2001 From: Wellington Chevreuil Date: Mon, 5 Jul 2021 11:35:04 +0100 Subject: [PATCH 06/12] method rename Change-Id: Ifefdb84c0a5cd93e99a3777063d164708732943b --- .../java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java | 4 ++-- .../java/org/apache/hadoop/hbase/regionserver/HStore.java | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java index e250a522a3e8..9277cc85a41d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java @@ -291,7 +291,7 @@ public void setCacheDataOnWrite(boolean cacheDataOnWrite) { * cacheIndexesOnWrite * cacheBloomsOnWrite */ - public void enableCacheOnWriteForCompactions() { + public void enableCacheOnWrite() { this.cacheDataOnWrite = true; this.cacheIndexesOnWrite = true; this.cacheBloomsOnWrite = true; @@ -313,7 +313,7 @@ public void enableCacheOnWriteForCompactions() { public boolean enableCacheOnWriteForCompactions(long totalCompactedFilesSize) { if (shouldCacheCompactedBlocksOnWrite() && totalCompactedFilesSize <= getCacheCompactedBlocksOnWriteThreshold()) { - enableCacheOnWriteForCompactions(); + enableCacheOnWrite(); return true; } else { setCacheDataOnWrite(false); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index d81a2eab848d..43a2195ec873 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -1170,7 +1170,7 @@ public StoreFileWriter createWriterInTmp(long maxKeyCount, Compression.Algorithm } else { final boolean shouldCacheDataOnWrite = cacheConf.shouldCacheDataOnWrite(); if (shouldCacheDataOnWrite) { - writerCacheConf.enableCacheOnWriteForCompactions(); + writerCacheConf.enableCacheOnWrite(); if (!cacheOnWriteLogged) { LOG.info("For {} , cacheDataOnWrite is true, hence enabled cacheOnWrite for " + "Index blocks and Bloom filter blocks", this); From 948b6e1b1b4a34fc2ea712e1bdefe1407f5f1500 Mon Sep 17 00:00:00 2001 From: Wellington Chevreuil Date: Mon, 5 Jul 2021 15:48:26 +0100 Subject: [PATCH 07/12] Addressing Stacks review comments Change-Id: I5452de8215403ef76091d814778ca74046fa2cbd --- .../hadoop/hbase/regionserver/HRegion.java | 2 +- .../hadoop/hbase/regionserver/HStore.java | 6 ------ .../regionserver/compactions/Compactor.java | 2 +- .../compactions/DirectStoreCompactor.java | 21 ++++++++++++++++--- 4 files changed, 20 insertions(+), 11 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 7c99069d632a..5a429aa4c693 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -1372,7 +1372,7 @@ public RegionInfo getRegionInfo() { * @return Instance of {@link RegionServerServices} used by this HRegion. * Can be null. */ - public RegionServerServices getRegionServerServices() { + RegionServerServices getRegionServerServices() { return this.rsServices; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index 43a2195ec873..7e4ce0668124 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -228,12 +228,6 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, private long blockingFileCount; private int compactionCheckMultiplier; - public Encryption.Context getCryptoContext() { - return cryptoContext; - } - - protected Encryption.Context cryptoContext = Encryption.Context.NONE; - AtomicLong flushedCellsCount = new AtomicLong(); private AtomicLong compactedCellsCount = new AtomicLong(); private AtomicLong majorCompactedCellsCount = new AtomicLong(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java index ea93c512f075..86d7cb2a51f5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java @@ -292,7 +292,7 @@ protected final StoreFileWriter createTmpWriter(FileDetails fd, boolean shouldDr */ protected StoreFileWriter initWriter(FileDetails fd, boolean shouldDropBehind, boolean major) throws IOException { - return this.createTmpWriter(fd, shouldDropBehind, major); + return createTmpWriter(fd, shouldDropBehind, major); } protected final StoreFileWriter createTmpWriter(FileDetails fd, boolean shouldDropBehind, diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DirectStoreCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DirectStoreCompactor.java index 00257ead0417..e04f57027414 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DirectStoreCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DirectStoreCompactor.java @@ -31,24 +31,39 @@ import org.apache.hadoop.hbase.regionserver.StoreFileWriter; import org.apache.yetus.audience.InterfaceAudience; +/** + * Alternative Compactor implementation, this class extends DefaultCompactor class, + * modifying original behaviour of initWriter and createFileInStoreDir + * methods to create compacted files in the final store directory, rather than temp, avoid the + * need to perform renames at compaction commit time. + */ @InterfaceAudience.Private public class DirectStoreCompactor extends DefaultCompactor { public DirectStoreCompactor(Configuration conf, HStore store) { super(conf, store); } + /** + * Overrides Compactor original implementation to create the resulting file directly + * in the store dir, rather than temp, in order to avoid the need for rename at commit time. + * @param fd the file details. + * @param shouldDropBehind boolean for the drop-behind output stream cache settings. + * @param major if compaction is major. + * @return + * @throws IOException + */ @Override protected StoreFileWriter initWriter(FileDetails fd, boolean shouldDropBehind, boolean major) throws IOException { // When all MVCC readpoints are 0, don't write them. // See HBASE-8166, HBASE-12600, and HBASE-13389. - return createWriterInFamilyDir(fd.maxKeyCount, + return createWriterInStoreDir(fd.maxKeyCount, major ? majorCompactionCompression : minorCompactionCompression, fd.maxMVCCReadpoint > 0, fd.maxTagsLength > 0, shouldDropBehind, fd.totalCompactedFilesSize); } - private StoreFileWriter createWriterInFamilyDir(long maxKeyCount, + private StoreFileWriter createWriterInStoreDir(long maxKeyCount, Compression.Algorithm compression, boolean includeMVCCReadpoint, boolean includesTag, boolean shouldDropBehind, long totalCompactedFilesSize) throws IOException { final CacheConfig writerCacheConf; @@ -56,7 +71,7 @@ private StoreFileWriter createWriterInFamilyDir(long maxKeyCount, writerCacheConf.enableCacheOnWriteForCompactions(totalCompactedFilesSize); InetSocketAddress[] favoredNodes = store.getStoreContext().getFavoredNodes(); HFileContext hFileContext = store.createFileContext(compression, includeMVCCReadpoint, - includesTag, store.getCryptoContext()); + includesTag, store.getStoreContext().getEncryptionContext()); Path familyDir = new Path(store.getRegionFileSystem().getRegionDir(), store.getColumnFamilyDescriptor().getNameAsString()); StoreFileWriter.Builder builder = new StoreFileWriter.Builder(conf, writerCacheConf, From 304226c3a85ea597f85ff8f8614ff2ee93a5e563 Mon Sep 17 00:00:00 2001 From: Wellington Chevreuil Date: Tue, 6 Jul 2021 21:05:06 +0100 Subject: [PATCH 08/12] fixing NPE on TestDirectStoreCompactor Change-Id: I40ca4a799f9f4a4e406f34a637cc75ec765a21d9 --- .../regionserver/compactions/TestDirectStoreCompactor.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestDirectStoreCompactor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestDirectStoreCompactor.java index fa4b1258d4d1..366433955929 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestDirectStoreCompactor.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestDirectStoreCompactor.java @@ -43,6 +43,7 @@ import org.apache.hadoop.hbase.regionserver.HRegionFileSystem; import org.apache.hadoop.hbase.regionserver.HStore; import org.apache.hadoop.hbase.regionserver.HStoreFile; +import org.apache.hadoop.hbase.regionserver.StoreContext; import org.apache.hadoop.hbase.regionserver.StoreFileWriter; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.RegionServerTests; @@ -86,6 +87,8 @@ public void setup() throws Exception { when(mockStore.createFileContext(isNull(), anyBoolean(), anyBoolean(), isNull())).thenReturn(mockFileContext); when(mockStore.getHRegion()).thenReturn(mock(HRegion.class)); + when(mockStore.getStoreContext()).thenReturn(new StoreContext.Builder(). + withFavoredNodesSupplier(()-> null).build()); ColumnFamilyDescriptor mockDesc = mock(ColumnFamilyDescriptor.class); when(mockDesc.getBloomFilterType()).thenReturn(BloomType.NONE); when(mockDesc.getNameAsString()).thenReturn(cfName); From da72e28b21f566acf6e3319d3fdeb1fa60e5a5f8 Mon Sep 17 00:00:00 2001 From: Wellington Chevreuil Date: Mon, 19 Jul 2021 15:00:55 +0100 Subject: [PATCH 09/12] Addressing the need to expose HStore.createStoreFileAndReade. Change-Id: I618ba247caf013c30eb54ea9bf0e36d0d1baabe3 --- .../hadoop/hbase/regionserver/HStore.java | 11 +++++++++-- .../regionserver/compactions/Compactor.java | 14 +++++++++----- .../compactions/DirectStoreCompactor.java | 7 +++++-- .../hadoop/hbase/regionserver/TestHStore.java | 4 +++- .../compactions/TestDefaultCompactor.java | 16 +++++++--------- .../compactions/TestDirectStoreCompactor.java | 17 +++++++++++++---- 6 files changed, 46 insertions(+), 23 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index 7e4ce0668124..4afb83b55088 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -694,7 +694,7 @@ private void refreshStoreFilesInternal(Collection newFiles) throw refreshStoreSizeAndTotalBytes(); } - public HStoreFile createStoreFileAndReader(final Path p) throws IOException { + HStoreFile createStoreFileAndReader(final Path p) throws IOException { StoreFileInfo info = new StoreFileInfo(conf, this.getFileSystem(), p, isPrimaryReplicaStore()); return createStoreFileAndReader(info); @@ -1505,7 +1505,14 @@ protected List doCompaction(CompactionRequestImpl cr, List newFiles) throws IOException { // Do the steps necessary to complete the compaction. setStoragePolicyFromFileName(newFiles); - List sfs = this.storeEngine.compactor.commitCompaction(cr, newFiles, user); + List sfs = this.storeEngine.compactor.commitCompaction(cr, newFiles, user, + p -> { + try { + return this.createStoreFileAndReader((Path) p); + }catch(IOException e){ + throw new RuntimeException(e); + } + }); writeCompactionWalRecord(filesToCompact, sfs); replaceStoreFiles(filesToCompact, sfs); if (cr.isMajor()) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java index 86d7cb2a51f5..9050be23a9d2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java @@ -28,6 +28,7 @@ import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.function.Function; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; @@ -558,18 +559,19 @@ protected InternalScanner createScanner(HStore store, ScanInfo scanInfo, * @param cr the compaction request. * @param newFiles the new files created by this compaction under a temp dir. * @param user the running user. + * @param fileAcessor a lambda expression with logic for loading a HStoreFile given a Path. * @return A list of the resulting store files already placed in the store dir and loaded into the * store cache. * @throws IOException if the commit fails. */ - public List commitCompaction(CompactionRequestImpl cr, List newFiles, User user) - throws IOException { + public List commitCompaction(CompactionRequestImpl cr, List newFiles, + User user, Function fileAcessor) throws IOException { List sfs = new ArrayList<>(newFiles.size()); for (Path newFile : newFiles) { assert newFile != null; this.store.validateStoreFile(newFile); // Move the file into the right spot - HStoreFile sf = createFileInStoreDir(newFile); + HStoreFile sf = createFileInStoreDir(newFile, fileAcessor); if (this.store.getCoprocessorHost() != null) { this.store.getCoprocessorHost().postCompact(this.store, sf, cr.getTracker(), cr, user); } @@ -584,12 +586,14 @@ public List commitCompaction(CompactionRequestImpl cr, List ne * Moves the new file from temp to the actual store directory, then create the related * HStoreFile instance * @param newFile the new file created. + * @param fileAcessor a lambda expression with logic for loading a HStoreFile given a Path. * @return an HStoreFile instance. * @throws IOException if the file store creation fails. */ - protected HStoreFile createFileInStoreDir(Path newFile) throws IOException { + protected HStoreFile createFileInStoreDir(Path newFile, Function fileAcessor) + throws IOException { Path destPath = this.store.getRegionFileSystem(). commitStoreFile(this.store.getColumnFamilyName(), newFile); - return this.store.createStoreFileAndReader(destPath); + return fileAcessor.apply(destPath); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DirectStoreCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DirectStoreCompactor.java index e04f57027414..ff132ed23632 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DirectStoreCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DirectStoreCompactor.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.net.InetSocketAddress; +import java.util.function.Function; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; @@ -90,11 +91,13 @@ private StoreFileWriter createWriterInStoreDir(long maxKeyCount, * Overrides Compactor original implementation, assuming the passed file is already in the store * directory, thus it only creates the related HStoreFile for the passed Path. * @param newFile the new file created. + * @param fileAcessor a lambda expression with logic for loading a HStoreFile given a Path. * @return HStoreFile reference for the newly created file. * @throws IOException if any error occurs. */ @Override - protected HStoreFile createFileInStoreDir(Path newFile) throws IOException { - return this.store.createStoreFileAndReader(newFile); + protected HStoreFile createFileInStoreDir(Path newFile, + Function fileAcessor) throws IOException { + return fileAcessor.apply(newFile); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java index f7b1c062a158..817c36da1558 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java @@ -50,6 +50,8 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; @@ -1970,7 +1972,7 @@ public DummyCompactor(Configuration conf, HStore store) { @Override public List commitCompaction(CompactionRequestImpl cr, - List newFiles, User user){ + List newFiles, User user, Function fileAccessor){ countDownLatch.countDown(); return null; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestDefaultCompactor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestDefaultCompactor.java index 21873d73a8a8..149a28ad9445 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestDefaultCompactor.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestDefaultCompactor.java @@ -104,18 +104,16 @@ public void testCommitCompaction() throws Exception { // so that we can use it later as parameter for Compactor.commitCompaction Path filePath = null; List tmpFilesList = new ArrayList<>(); - for(HStoreFile file : store.getStorefiles()){ - filePath = file.getPath(); - Path tmpPath = new Path(store.getRegionFileSystem().getRegionDir(), ".tmp"); - tmpPath = new Path(tmpPath, filePath.getName()); - store.getFileSystem().rename(filePath, tmpPath); - tmpFilesList.add(tmpPath); - break; - } + final HStoreFile file = (HStoreFile)store.getStorefiles().toArray()[0]; + filePath = file.getPath(); + Path tmpPath = new Path(store.getRegionFileSystem().getRegionDir(), ".tmp"); + tmpPath = new Path(tmpPath, filePath.getName()); + store.getFileSystem().rename(filePath, tmpPath); + tmpFilesList.add(tmpPath); DefaultCompactor compactor = new DefaultCompactor(config, store); //pass the renamed original file, then asserts it has the proper store dir path List result = compactor.commitCompaction(mock(CompactionRequestImpl.class), - tmpFilesList, null); + tmpFilesList, null, p -> file); assertEquals(1, result.size()); assertEquals(filePath, result.get(0).getPath()); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestDirectStoreCompactor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestDirectStoreCompactor.java index 366433955929..0f52e8c3d6d4 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestDirectStoreCompactor.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestDirectStoreCompactor.java @@ -23,6 +23,7 @@ import static org.mockito.ArgumentMatchers.isNull; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import org.apache.commons.io.file.AccumulatorPathVisitor; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; @@ -34,6 +35,7 @@ import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.io.ByteBuffAllocator; +import org.apache.hadoop.hbase.io.Reference; import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.hadoop.hbase.io.crypto.Encryption; import org.apache.hadoop.hbase.io.hfile.CacheConfig; @@ -47,7 +49,10 @@ import org.apache.hadoop.hbase.regionserver.StoreFileWriter; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.util.AtomicUtils; import org.apache.hadoop.hbase.util.ChecksumType; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.test.PathUtils; import org.junit.Before; import org.junit.ClassRule; import org.junit.Rule; @@ -56,6 +61,8 @@ import org.junit.rules.TestName; import org.mockito.ArgumentCaptor; +import java.util.concurrent.atomic.LongAccumulator; + /** * Test class for DirectStoreCompactor. */ @@ -125,11 +132,13 @@ public void testInitWriter() throws Exception { @Test public void testCreateFileInStoreDir() throws Exception { HStoreFile mockFile = mock(HStoreFile.class); - ArgumentCaptor pathCaptor = ArgumentCaptor.forClass(Path.class); - when(mockStore.createStoreFileAndReader(pathCaptor.capture())).thenReturn(mockFile); + final StringBuilder builder = new StringBuilder(); DirectStoreCompactor compactor = new DirectStoreCompactor(config, mockStore); StoreFileWriter writer = compactor.initWriter(mockFileDetails, false, false); - compactor.createFileInStoreDir(writer.getPath()); - assertEquals(writer.getPath(), pathCaptor.getValue()); + compactor.createFileInStoreDir(writer.getPath(), p -> { + builder.append(p.getParent().getName()); + return mockFile; + }); + assertEquals(writer.getPath().getParent().getName(), builder.toString()); } } From 9bd929f793bf66ba90f8c0f252ad28e74950bfda Mon Sep 17 00:00:00 2001 From: Wellington Chevreuil Date: Tue, 20 Jul 2021 21:27:04 +0100 Subject: [PATCH 10/12] addressing Josh's review suggestions Change-Id: I120093272991de10d33f02903e7b5e51c6767978 --- .../hadoop/hbase/io/hfile/CacheConfig.java | 5 + .../hadoop/hbase/regionserver/HStore.java | 25 +---- .../hbase/regionserver/StoreEngine.java | 50 +++++++++ .../regionserver/compactions/Compactor.java | 16 ++- .../compactions/DefaultCompactor.java | 2 + .../compactions/DirectStoreCompactor.java | 22 ++-- .../hadoop/hbase/regionserver/TestHStore.java | 1 + .../compactions/TestDefaultCompactor.java | 8 +- .../compactions/TestDirectStoreCompactor.java | 102 +++++------------- 9 files changed, 117 insertions(+), 114 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java index 9277cc85a41d..1d702d0ddc61 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java @@ -298,6 +298,11 @@ public void enableCacheOnWrite() { } /** + * Checks if write cache should be enabled for compactions. + * + * To be called only in the context of compactions, + * for flushes or other write operations, use enableCacheOnWrite. + * * If hbase.rs.cachecompactedblocksonwrite configuration is set to true and * 'totalCompactedFilesSize' is lower than 'cacheCompactedDataOnWriteThreshold', * enables cache on write for below properties: diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index 4afb83b55088..7a7c8f935a6a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -1189,29 +1189,10 @@ public StoreFileWriter createWriterInTmp(long maxKeyCount, Compression.Algorithm return builder.build(); } - public HFileContext createFileContext(Compression.Algorithm compression, + HFileContext createFileContext(Compression.Algorithm compression, boolean includeMVCCReadpoint, boolean includesTag, Encryption.Context encryptionContext) { - if (compression == null) { - compression = HFile.DEFAULT_COMPRESSION_ALGORITHM; - } - ColumnFamilyDescriptor family = getColumnFamilyDescriptor(); - HFileContext hFileContext = new HFileContextBuilder() - .withIncludesMvcc(includeMVCCReadpoint) - .withIncludesTags(includesTag) - .withCompression(compression) - .withCompressTags(family.isCompressTags()) - .withChecksumType(StoreUtils.getChecksumType(conf)) - .withBytesPerCheckSum(StoreUtils.getBytesPerChecksum(conf)) - .withBlockSize(family.getBlocksize()) - .withHBaseCheckSum(true) - .withDataBlockEncoding(family.getDataBlockEncoding()) - .withEncryptionContext(encryptionContext) - .withCreateTime(EnvironmentEdgeManager.currentTime()) - .withColumnFamily(getColumnFamilyDescriptor().getName()) - .withTableName(getTableName().getName()) - .withCellComparator(getComparator()) - .build(); - return hFileContext; + return storeEngine.createFileContext(compression, includeMVCCReadpoint, includesTag, + encryptionContext, getColumnFamilyDescriptor(), getTableName(), getComparator(), conf); } private long getTotalSize(Collection sfs) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreEngine.java index 60b3c3d0d20f..0cf86b170cfe 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreEngine.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreEngine.java @@ -24,9 +24,17 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.CellComparator; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; +import org.apache.hadoop.hbase.io.compress.Compression; +import org.apache.hadoop.hbase.io.crypto.Encryption; +import org.apache.hadoop.hbase.io.hfile.HFile; +import org.apache.hadoop.hbase.io.hfile.HFileContext; +import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; import org.apache.hadoop.hbase.regionserver.compactions.CompactionPolicy; import org.apache.hadoop.hbase.regionserver.compactions.Compactor; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.ReflectionUtils; import org.apache.yetus.audience.InterfaceAudience; @@ -128,4 +136,46 @@ private void createComponentsOnce( throw new IOException("Unable to load configured store engine '" + className + "'", e); } } + + /** + * Constructs an HFileContext instance for the given store. + * @param compression the Compression.Algorithm to be used. + * @param includeMVCCReadpoint whether MVCC read point is to be used. + * @param includesTag wheter Tags should be used. + * @param encryptionContext the Encryption.Context to be used. + * @param familyDescriptor ColumnFamilyDescriptor with info about the column family. + * @param tableName the table name this store belongs to. + * @param cellComparator the CellComparator to be used. + * @param conf the cluster configuration. + * @return an HFileContext instance. + */ + public HFileContext createFileContext(Compression.Algorithm compression, + boolean includeMVCCReadpoint, boolean includesTag, + Encryption.Context encryptionContext, + ColumnFamilyDescriptor familyDescriptor, + TableName tableName, + CellComparator cellComparator, + Configuration conf) { + if (compression == null) { + compression = HFile.DEFAULT_COMPRESSION_ALGORITHM; + } + ColumnFamilyDescriptor family = familyDescriptor; + HFileContext hFileContext = new HFileContextBuilder() + .withIncludesMvcc(includeMVCCReadpoint) + .withIncludesTags(includesTag) + .withCompression(compression) + .withCompressTags(family.isCompressTags()) + .withChecksumType(StoreUtils.getChecksumType(conf)) + .withBytesPerCheckSum(StoreUtils.getBytesPerChecksum(conf)) + .withBlockSize(family.getBlocksize()) + .withHBaseCheckSum(true) + .withDataBlockEncoding(family.getDataBlockEncoding()) + .withEncryptionContext(encryptionContext) + .withCreateTime(EnvironmentEdgeManager.currentTime()) + .withColumnFamily(family.getName()) + .withTableName(tableName.getName()) + .withCellComparator(cellComparator) + .build(); + return hFileContext; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java index 9050be23a9d2..71886bb6f105 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java @@ -136,7 +136,13 @@ protected static class FileDetails { /** Min SeqId to keep during a major compaction **/ public long minSeqIdToKeep = 0; /** Total size of the compacted files **/ - public long totalCompactedFilesSize = 0; + private long totalCompactedFilesSize = 0; + + public long getTotalCompactedFilesSize() { + return totalCompactedFilesSize; + } + + } /** @@ -586,14 +592,14 @@ public List commitCompaction(CompactionRequestImpl cr, List ne * Moves the new file from temp to the actual store directory, then create the related * HStoreFile instance * @param newFile the new file created. - * @param fileAcessor a lambda expression with logic for loading a HStoreFile given a Path. + * @param fileAccessor a lambda expression with logic for loading a HStoreFile given a Path. * @return an HStoreFile instance. * @throws IOException if the file store creation fails. */ - protected HStoreFile createFileInStoreDir(Path newFile, Function fileAcessor) - throws IOException { + protected HStoreFile createFileInStoreDir(Path newFile, Function fileAccessor) + throws IOException { Path destPath = this.store.getRegionFileSystem(). commitStoreFile(this.store.getColumnFamilyName(), newFile); - return fileAcessor.apply(destPath); + return fileAccessor.apply(destPath); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java index bd0bd0eb9b0c..272fba94c905 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.util.Collection; import java.util.List; +import java.util.function.Function; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; @@ -106,4 +107,5 @@ protected void abortWriter(StoreFileWriter writer) throws IOException { e); } } + } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DirectStoreCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DirectStoreCompactor.java index ff132ed23632..e508dab955dd 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DirectStoreCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DirectStoreCompactor.java @@ -33,7 +33,10 @@ import org.apache.yetus.audience.InterfaceAudience; /** - * Alternative Compactor implementation, this class extends DefaultCompactor class, + * Alternative Compactor implementation that writes compacted files straight + * into the store directory. + * + * This class extends DefaultCompactor class, * modifying original behaviour of initWriter and createFileInStoreDir * methods to create compacted files in the final store directory, rather than temp, avoid the * need to perform renames at compaction commit time. @@ -61,18 +64,19 @@ protected StoreFileWriter initWriter(FileDetails fd, boolean shouldDropBehind, b return createWriterInStoreDir(fd.maxKeyCount, major ? majorCompactionCompression : minorCompactionCompression, fd.maxMVCCReadpoint > 0, fd.maxTagsLength > 0, - shouldDropBehind, fd.totalCompactedFilesSize); + shouldDropBehind, fd.getTotalCompactedFilesSize()); } private StoreFileWriter createWriterInStoreDir(long maxKeyCount, Compression.Algorithm compression, boolean includeMVCCReadpoint, boolean includesTag, boolean shouldDropBehind, long totalCompactedFilesSize) throws IOException { - final CacheConfig writerCacheConf; - writerCacheConf = new CacheConfig(store.getCacheConfig()); + final CacheConfig writerCacheConf = new CacheConfig(store.getCacheConfig()); writerCacheConf.enableCacheOnWriteForCompactions(totalCompactedFilesSize); InetSocketAddress[] favoredNodes = store.getStoreContext().getFavoredNodes(); - HFileContext hFileContext = store.createFileContext(compression, includeMVCCReadpoint, - includesTag, store.getStoreContext().getEncryptionContext()); + HFileContext hFileContext = store.getStoreEngine(). + createFileContext(compression, includeMVCCReadpoint,includesTag, + store.getStoreContext().getEncryptionContext(), store.getColumnFamilyDescriptor(), + store.getTableName(), store.getComparator(), conf); Path familyDir = new Path(store.getRegionFileSystem().getRegionDir(), store.getColumnFamilyDescriptor().getNameAsString()); StoreFileWriter.Builder builder = new StoreFileWriter.Builder(conf, writerCacheConf, @@ -91,13 +95,13 @@ private StoreFileWriter createWriterInStoreDir(long maxKeyCount, * Overrides Compactor original implementation, assuming the passed file is already in the store * directory, thus it only creates the related HStoreFile for the passed Path. * @param newFile the new file created. - * @param fileAcessor a lambda expression with logic for loading a HStoreFile given a Path. + * @param fileAccessor a lambda expression with logic for loading a HStoreFile given a Path. * @return HStoreFile reference for the newly created file. * @throws IOException if any error occurs. */ @Override protected HStoreFile createFileInStoreDir(Path newFile, - Function fileAcessor) throws IOException { - return fileAcessor.apply(newFile); + Function fileAccessor) throws IOException { + return fileAccessor.apply(newFile); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java index 817c36da1558..dfc77a7f0f53 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java @@ -1758,6 +1758,7 @@ public void testDoCompactionDelegatesCommit() throws Exception { try { store.doCompaction(mock(CompactionRequestImpl.class), null, mock(User.class), 0, files); + fail(); } catch(Throwable e){} assertEquals(0, DummyCompactor.countDownLatch.getCount()); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestDefaultCompactor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestDefaultCompactor.java index 149a28ad9445..847d82693376 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestDefaultCompactor.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestDefaultCompactor.java @@ -59,9 +59,7 @@ public class TestDefaultCompactor { public TestName name = new TestName(); private final Configuration config = new Configuration(); - private HStore store; private final String cfName = "cf"; - private Compactor.FileDetails mockFileDetails; private HBaseTestingUtility UTIL = new HBaseTestingUtility(); private TableName table; @@ -81,10 +79,10 @@ public void shutdown() throws IOException { @Test public void testInitWriter() throws Exception { - store = UTIL.getMiniHBaseCluster().getRegionServer(0). + HStore store = UTIL.getMiniHBaseCluster().getRegionServer(0). getRegions(table).get(0).getStores().get(0); DefaultCompactor compactor = new DefaultCompactor(config, store); - mockFileDetails = mock(Compactor.FileDetails.class); + Compactor.FileDetails mockFileDetails = mock(Compactor.FileDetails.class); StoreFileWriter writer = compactor.initWriter(mockFileDetails, false, false); Path tmpPath = new Path(store.getRegionFileSystem().getRegionDir(), ".tmp"); assertEquals(new Path(tmpPath, cfName), writer.getPath().getParent()); @@ -98,7 +96,7 @@ public void testCommitCompaction() throws Exception { put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("1"), Bytes.toBytes("v1")); tbl.put(put); UTIL.flush(table); - store = UTIL.getMiniHBaseCluster().getRegionServer(0). + HStore store = UTIL.getMiniHBaseCluster().getRegionServer(0). getRegions(table).get(0).getStores().get(0); //Will move the existing file into a tmp folder, // so that we can use it later as parameter for Compactor.commitCompaction diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestDirectStoreCompactor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestDirectStoreCompactor.java index 0f52e8c3d6d4..f3060cfe8416 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestDirectStoreCompactor.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestDirectStoreCompactor.java @@ -18,50 +18,26 @@ package org.apache.hadoop.hbase.regionserver.compactions; import static junit.framework.TestCase.assertEquals; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyBoolean; -import static org.mockito.ArgumentMatchers.isNull; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; -import org.apache.commons.io.file.AccumulatorPathVisitor; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.HBaseClassTestRule; -import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; -import org.apache.hadoop.hbase.client.RegionInfo; -import org.apache.hadoop.hbase.io.ByteBuffAllocator; -import org.apache.hadoop.hbase.io.Reference; -import org.apache.hadoop.hbase.io.compress.Compression; -import org.apache.hadoop.hbase.io.crypto.Encryption; -import org.apache.hadoop.hbase.io.hfile.CacheConfig; -import org.apache.hadoop.hbase.io.hfile.HFileContext; -import org.apache.hadoop.hbase.regionserver.BloomType; -import org.apache.hadoop.hbase.regionserver.HRegion; -import org.apache.hadoop.hbase.regionserver.HRegionFileSystem; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.regionserver.HStore; import org.apache.hadoop.hbase.regionserver.HStoreFile; -import org.apache.hadoop.hbase.regionserver.StoreContext; import org.apache.hadoop.hbase.regionserver.StoreFileWriter; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.RegionServerTests; -import org.apache.hadoop.hbase.util.AtomicUtils; -import org.apache.hadoop.hbase.util.ChecksumType; -import org.apache.hadoop.hbase.util.Pair; -import org.apache.hadoop.test.PathUtils; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.After; import org.junit.Before; import org.junit.ClassRule; import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.rules.TestName; -import org.mockito.ArgumentCaptor; -import java.util.concurrent.atomic.LongAccumulator; +import java.io.IOException; /** * Test class for DirectStoreCompactor. @@ -76,64 +52,44 @@ public class TestDirectStoreCompactor { @Rule public TestName name = new TestName(); - private Configuration config = new Configuration(); - private HStore mockStore; - private String cfName = name.getMethodName()+"-CF"; - private Compactor.FileDetails mockFileDetails; + private final Configuration config = new Configuration(); + private final String cfName = "cf"; + + private HBaseTestingUtility UTIL = new HBaseTestingUtility(); + private TableName table; @Before public void setup() throws Exception { - Path filePath = new Path(name.getMethodName()); - mockStore = mock(HStore.class); - HRegionFileSystem mockRegionFS = mock(HRegionFileSystem.class); - when(mockStore.getRegionFileSystem()).thenReturn(mockRegionFS); - when(mockRegionFS.getRegionDir()).thenReturn(filePath); - when(mockStore.getColumnFamilyName()).thenReturn(cfName); - HFileContext mockFileContext = mock(HFileContext.class); - when(mockFileContext.getBytesPerChecksum()).thenReturn(100); - when(mockStore.createFileContext(isNull(), anyBoolean(), - anyBoolean(), isNull())).thenReturn(mockFileContext); - when(mockStore.getHRegion()).thenReturn(mock(HRegion.class)); - when(mockStore.getStoreContext()).thenReturn(new StoreContext.Builder(). - withFavoredNodesSupplier(()-> null).build()); - ColumnFamilyDescriptor mockDesc = mock(ColumnFamilyDescriptor.class); - when(mockDesc.getBloomFilterType()).thenReturn(BloomType.NONE); - when(mockDesc.getNameAsString()).thenReturn(cfName); - when(mockStore.getColumnFamilyDescriptor()).thenReturn(mockDesc); - FileSystem mockFS = mock(FileSystem.class); - when(mockFS.exists(any(Path.class))).thenReturn(true); - FileStatus mockFileStatus = mock(FileStatus.class); - when(mockFileStatus.isDirectory()).thenReturn(true); - when(mockFS.getFileStatus(any(Path.class))).thenReturn(mockFileStatus); - when(mockStore.getFileSystem()).thenReturn(mockFS); - when(mockFS.getConf()).thenReturn(config); - when(mockFS.create(any(Path.class), any(FsPermission.class), any(Boolean.class), - any(Integer.class), any(Short.class), any(Long.class), any())) - .thenReturn(mock(FSDataOutputStream.class)); - CacheConfig mockCacheConfig = mock(CacheConfig.class); - when(mockCacheConfig.getByteBuffAllocator()).thenReturn(mock(ByteBuffAllocator.class)); - when(mockStore.getCacheConfig()).thenReturn(mockCacheConfig); - when(mockFileContext.getEncryptionContext()).thenReturn(Encryption.Context.NONE); - when(mockFileContext.getCompression()).thenReturn(Compression.Algorithm.NONE); - when(mockFileContext.getChecksumType()).thenReturn(ChecksumType.NULL); - when(mockFileContext.getCellComparator()).thenReturn(mock(CellComparator.class)); - when(mockStore.getRegionInfo()).thenReturn(mock(RegionInfo.class)); - this.mockFileDetails = mock(Compactor.FileDetails.class); + UTIL.startMiniCluster(); + table = TableName.valueOf(name.getMethodName()); + UTIL.createTable(table, Bytes.toBytes(cfName)); + + } + + @After + public void shutdown() throws IOException { + UTIL.shutdownMiniCluster(); } @Test public void testInitWriter() throws Exception { - DirectStoreCompactor compactor = new DirectStoreCompactor(config, mockStore); + HStore store = UTIL.getMiniHBaseCluster().getRegionServer(0). + getRegions(table).get(0).getStores().get(0); + DirectStoreCompactor compactor = new DirectStoreCompactor(config, store); + Compactor.FileDetails mockFileDetails = mock(Compactor.FileDetails.class); StoreFileWriter writer = compactor.initWriter(mockFileDetails, false, false); - Path filePath = new Path(name.getMethodName()); - assertEquals(new Path(filePath, cfName), writer.getPath().getParent()); + //asserts the parent dir is the family dir itself, not .tmp + assertEquals(cfName, writer.getPath().getParent().getName()); } @Test public void testCreateFileInStoreDir() throws Exception { HStoreFile mockFile = mock(HStoreFile.class); final StringBuilder builder = new StringBuilder(); - DirectStoreCompactor compactor = new DirectStoreCompactor(config, mockStore); + HStore store = UTIL.getMiniHBaseCluster().getRegionServer(0). + getRegions(table).get(0).getStores().get(0); + DirectStoreCompactor compactor = new DirectStoreCompactor(config, store); + Compactor.FileDetails mockFileDetails = mock(Compactor.FileDetails.class); StoreFileWriter writer = compactor.initWriter(mockFileDetails, false, false); compactor.createFileInStoreDir(writer.getPath(), p -> { builder.append(p.getParent().getName()); From ed21c9563f88d9e983945f87a64d2a6cc7d863fe Mon Sep 17 00:00:00 2001 From: Wellington Chevreuil Date: Wed, 21 Jul 2021 09:52:51 +0100 Subject: [PATCH 11/12] fixing checkstyles and findbugs Change-Id: Ie439d3a0dc6d6e5006c37c72711b0f236aa38908 --- .../apache/hadoop/hbase/regionserver/HStore.java | 1 - .../hbase/regionserver/compactions/Compactor.java | 2 +- .../compactions/DefaultCompactor.java | 15 +++++++-------- .../compactions/DirectStoreCompactor.java | 4 ++-- .../hadoop/hbase/regionserver/TestHStore.java | 2 +- .../compactions/TestDirectStoreCompactor.java | 4 +++- 6 files changed, 14 insertions(+), 14 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index 7a7c8f935a6a..70755cf5d815 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -77,7 +77,6 @@ import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.io.hfile.HFileContext; -import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder; import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl; import org.apache.hadoop.hbase.io.hfile.HFileScanner; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java index 71886bb6f105..9312133e720c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java @@ -567,7 +567,7 @@ protected InternalScanner createScanner(HStore store, ScanInfo scanInfo, * @param user the running user. * @param fileAcessor a lambda expression with logic for loading a HStoreFile given a Path. * @return A list of the resulting store files already placed in the store dir and loaded into the - * store cache. + * store cache. * @throws IOException if the commit fails. */ public List commitCompaction(CompactionRequestImpl cr, List newFiles, diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java index 272fba94c905..ac228a1b4532 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java @@ -20,7 +20,6 @@ import java.io.IOException; import java.util.Collection; import java.util.List; -import java.util.function.Function; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; @@ -49,13 +48,13 @@ public DefaultCompactor(Configuration conf, HStore store) { } private final CellSinkFactory writerFactory = - new CellSinkFactory() { - @Override - public StoreFileWriter createWriter(InternalScanner scanner, FileDetails fd, - boolean shouldDropBehind, boolean major) throws IOException { - return initWriter(fd, shouldDropBehind, major); - } - }; + new CellSinkFactory() { + @Override + public StoreFileWriter createWriter(InternalScanner scanner, FileDetails fd, + boolean shouldDropBehind, boolean major) throws IOException { + return initWriter(fd, shouldDropBehind, major); + } + }; /** * Do a minor/major compaction on an explicit set of storefiles from a Store. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DirectStoreCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DirectStoreCompactor.java index e508dab955dd..8210146e7cca 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DirectStoreCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DirectStoreCompactor.java @@ -53,8 +53,8 @@ public DirectStoreCompactor(Configuration conf, HStore store) { * @param fd the file details. * @param shouldDropBehind boolean for the drop-behind output stream cache settings. * @param major if compaction is major. - * @return - * @throws IOException + * @return an instance of StoreFileWriter for the given file details. + * @throws IOException if any error occurs. */ @Override protected StoreFileWriter initWriter(FileDetails fd, boolean shouldDropBehind, boolean major) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java index dfc77a7f0f53..82f5d50f7c37 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java @@ -1759,7 +1759,7 @@ public void testDoCompactionDelegatesCommit() throws Exception { store.doCompaction(mock(CompactionRequestImpl.class), null, mock(User.class), 0, files); fail(); - } catch(Throwable e){} + } catch(Exception e){} assertEquals(0, DummyCompactor.countDownLatch.getCount()); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestDirectStoreCompactor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestDirectStoreCompactor.java index f3060cfe8416..0481b3299a48 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestDirectStoreCompactor.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestDirectStoreCompactor.java @@ -19,6 +19,9 @@ import static junit.framework.TestCase.assertEquals; import static org.mockito.Mockito.mock; + +import java.io.IOException; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtility; @@ -37,7 +40,6 @@ import org.junit.experimental.categories.Category; import org.junit.rules.TestName; -import java.io.IOException; /** * Test class for DirectStoreCompactor. From db80e290c571384c9990b3b803ff2b98a69562b9 Mon Sep 17 00:00:00 2001 From: Wellington Chevreuil Date: Thu, 22 Jul 2021 14:33:06 +0100 Subject: [PATCH 12/12] adding our own functional interface for HStoreFile provision in Compactor Change-Id: I129da2790d72e0a1043f6359704bc9fc423bcc30 --- .../hadoop/hbase/regionserver/HStore.java | 8 +------- .../regionserver/compactions/Compactor.java | 20 +++++++++++++------ .../compactions/DirectStoreCompactor.java | 8 ++++---- .../hadoop/hbase/regionserver/TestHStore.java | 2 +- 4 files changed, 20 insertions(+), 18 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index 70755cf5d815..6ca6950154d5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -1486,13 +1486,7 @@ protected List doCompaction(CompactionRequestImpl cr, // Do the steps necessary to complete the compaction. setStoragePolicyFromFileName(newFiles); List sfs = this.storeEngine.compactor.commitCompaction(cr, newFiles, user, - p -> { - try { - return this.createStoreFileAndReader((Path) p); - }catch(IOException e){ - throw new RuntimeException(e); - } - }); + p -> createStoreFileAndReader(p)); writeCompactionWalRecord(filesToCompact, sfs); replaceStoreFiles(filesToCompact, sfs); if (cr.isMajor()) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java index 9312133e720c..519f59a65cd5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java @@ -565,19 +565,19 @@ protected InternalScanner createScanner(HStore store, ScanInfo scanInfo, * @param cr the compaction request. * @param newFiles the new files created by this compaction under a temp dir. * @param user the running user. - * @param fileAcessor a lambda expression with logic for loading a HStoreFile given a Path. + * @param fileProvider a lambda expression with logic for loading a HStoreFile given a Path. * @return A list of the resulting store files already placed in the store dir and loaded into the * store cache. * @throws IOException if the commit fails. */ public List commitCompaction(CompactionRequestImpl cr, List newFiles, - User user, Function fileAcessor) throws IOException { + User user, StoreFileProvider fileProvider) throws IOException { List sfs = new ArrayList<>(newFiles.size()); for (Path newFile : newFiles) { assert newFile != null; this.store.validateStoreFile(newFile); // Move the file into the right spot - HStoreFile sf = createFileInStoreDir(newFile, fileAcessor); + HStoreFile sf = createFileInStoreDir(newFile, fileProvider); if (this.store.getCoprocessorHost() != null) { this.store.getCoprocessorHost().postCompact(this.store, sf, cr.getTracker(), cr, user); } @@ -592,14 +592,22 @@ public List commitCompaction(CompactionRequestImpl cr, List ne * Moves the new file from temp to the actual store directory, then create the related * HStoreFile instance * @param newFile the new file created. - * @param fileAccessor a lambda expression with logic for loading a HStoreFile given a Path. + * @param fileProvider a lambda expression with logic for creating a HStoreFile given a Path. * @return an HStoreFile instance. * @throws IOException if the file store creation fails. */ - protected HStoreFile createFileInStoreDir(Path newFile, Function fileAccessor) + protected HStoreFile createFileInStoreDir(Path newFile, StoreFileProvider fileProvider) throws IOException { Path destPath = this.store.getRegionFileSystem(). commitStoreFile(this.store.getColumnFamilyName(), newFile); - return fileAccessor.apply(destPath); + return fileProvider.createFile(destPath); + } + + /** + * Functional interface to allow callers provide the logic specific for creating StoreFiles. + */ + @FunctionalInterface + public interface StoreFileProvider { + HStoreFile createFile(Path path) throws IOException; } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DirectStoreCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DirectStoreCompactor.java index 8210146e7cca..bbaaade8a4a8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DirectStoreCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DirectStoreCompactor.java @@ -95,13 +95,13 @@ private StoreFileWriter createWriterInStoreDir(long maxKeyCount, * Overrides Compactor original implementation, assuming the passed file is already in the store * directory, thus it only creates the related HStoreFile for the passed Path. * @param newFile the new file created. - * @param fileAccessor a lambda expression with logic for loading a HStoreFile given a Path. + * @param fileProvider a lambda expression with logic for loading a HStoreFile given a Path. * @return HStoreFile reference for the newly created file. * @throws IOException if any error occurs. */ @Override - protected HStoreFile createFileInStoreDir(Path newFile, - Function fileAccessor) throws IOException { - return fileAccessor.apply(newFile); + protected HStoreFile createFileInStoreDir(Path newFile, StoreFileProvider fileProvider) + throws IOException { + return fileProvider.createFile(newFile); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java index 82f5d50f7c37..4c586863f3d5 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java @@ -1973,7 +1973,7 @@ public DummyCompactor(Configuration conf, HStore store) { @Override public List commitCompaction(CompactionRequestImpl cr, - List newFiles, User user, Function fileAccessor){ + List newFiles, User user, StoreFileProvider fileProvider){ countDownLatch.countDown(); return null; }