diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java
index dcbb71582f44..1d702d0ddc61 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java
@@ -297,6 +297,35 @@ public void enableCacheOnWrite() {
this.cacheBloomsOnWrite = true;
}
+ /**
+ * Checks if write cache should be enabled for compactions.
+ *
+ * To be called only in the context of compactions,
+ * for flushes or other write operations, use enableCacheOnWrite.
+ *
+ * If hbase.rs.cachecompactedblocksonwrite configuration is set to true and
+ * 'totalCompactedFilesSize' is lower than 'cacheCompactedDataOnWriteThreshold',
+ * enables cache on write for below properties:
+ * - cacheDataOnWrite
+ * - cacheIndexesOnWrite
+ * - cacheBloomsOnWrite
+ *
+ * Otherwise, sets 'cacheDataOnWrite' only to false.
+ *
+ * @param totalCompactedFilesSize the total size of compacted files.
+ * @return true if the checks mentioned above pass and the cache is enabled, false otherwise.
+ */
+ public boolean enableCacheOnWriteForCompactions(long totalCompactedFilesSize) {
+ if (shouldCacheCompactedBlocksOnWrite() && totalCompactedFilesSize <=
+ getCacheCompactedBlocksOnWriteThreshold()) {
+ enableCacheOnWrite();
+ return true;
+ } else {
+ setCacheDataOnWrite(false);
+ return false;
+ }
+ }
+
/**
* @return true if index blocks should be written to the cache when an HFile
* is written, false if not
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/PersistedStoreFlushContext.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DirectStoreFlushContext.java
similarity index 86%
rename from hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/PersistedStoreFlushContext.java
rename to hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DirectStoreFlushContext.java
index 0031b0e498ea..63d5e9cab979 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/PersistedStoreFlushContext.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DirectStoreFlushContext.java
@@ -30,14 +30,14 @@
* directly in the store dir, and therefore, doesn't perform a rename from tmp dir
* into the store dir.
*
- * To be used only when PersistedStoreEngine is configured as the StoreEngine implementation.
+ * To be used only when DirectStoreFlushContext is configured as the StoreEngine implementation.
*/
@InterfaceAudience.Private
-public class PersistedStoreFlushContext extends DefaultStoreFlushContext {
+public class DirectStoreFlushContext extends DefaultStoreFlushContext {
- private static final Logger LOG = LoggerFactory.getLogger(PersistedStoreFlushContext.class);
+ private static final Logger LOG = LoggerFactory.getLogger(DirectStoreFlushContext.class);
- public PersistedStoreFlushContext(HStore store, Long cacheFlushSeqNum,
+ public DirectStoreFlushContext(HStore store, Long cacheFlushSeqNum,
FlushLifeCycleTracker tracker) {
super.init(store, cacheFlushSeqNum, tracker);
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/PersistedEngineStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DirectStoreFlusher.java
similarity index 94%
rename from hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/PersistedEngineStoreFlusher.java
rename to hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DirectStoreFlusher.java
index 9094e58b938c..a91fb42829f2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/PersistedEngineStoreFlusher.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DirectStoreFlusher.java
@@ -33,12 +33,12 @@
/**
* A StoreFlusher that writes hfiles directly into the actual store directory,
- * instead of a temp dir..
+ * instead of a temp dir.
*/
@InterfaceAudience.Private
-public class PersistedEngineStoreFlusher extends DefaultStoreFlusher {
+public class DirectStoreFlusher extends DefaultStoreFlusher {
- public PersistedEngineStoreFlusher(Configuration conf, HStore store) {
+ public DirectStoreFlusher(Configuration conf, HStore store) {
super(conf, store);
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index ce0c641fbb34..6ca6950154d5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -77,7 +77,6 @@
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileContext;
-import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl;
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
@@ -431,7 +430,7 @@ public static long determineTTLFromFamily(final ColumnFamilyDescriptor family) {
return ttl;
}
- StoreContext getStoreContext() {
+ public StoreContext getStoreContext() {
return storeContext;
}
@@ -694,7 +693,7 @@ private void refreshStoreFilesInternal(Collection newFiles) throw
refreshStoreSizeAndTotalBytes();
}
- protected HStoreFile createStoreFileAndReader(final Path p) throws IOException {
+ HStoreFile createStoreFileAndReader(final Path p) throws IOException {
StoreFileInfo info = new StoreFileInfo(conf, this.getFileSystem(),
p, isPrimaryReplicaStore());
return createStoreFileAndReader(info);
@@ -1145,16 +1144,13 @@ public StoreFileWriter createWriterInTmp(long maxKeyCount, Compression.Algorithm
// if data blocks are to be cached on write
// during compaction, we should forcefully
// cache index and bloom blocks as well
- if (cacheCompactedBlocksOnWrite && totalCompactedFilesSize <= cacheConf
- .getCacheCompactedBlocksOnWriteThreshold()) {
- writerCacheConf.enableCacheOnWrite();
+ if (writerCacheConf.enableCacheOnWriteForCompactions(totalCompactedFilesSize)) {
if (!cacheOnWriteLogged) {
LOG.info("For {} , cacheCompactedBlocksOnWrite is true, hence enabled " +
"cacheOnWrite for Data blocks, Index blocks and Bloom filter blocks", this);
cacheOnWriteLogged = true;
}
} else {
- writerCacheConf.setCacheDataOnWrite(false);
if (totalCompactedFilesSize > cacheConf.getCacheCompactedBlocksOnWriteThreshold()) {
// checking condition once again for logging
LOG.debug(
@@ -1194,27 +1190,8 @@ public StoreFileWriter createWriterInTmp(long maxKeyCount, Compression.Algorithm
HFileContext createFileContext(Compression.Algorithm compression,
boolean includeMVCCReadpoint, boolean includesTag, Encryption.Context encryptionContext) {
- if (compression == null) {
- compression = HFile.DEFAULT_COMPRESSION_ALGORITHM;
- }
- ColumnFamilyDescriptor family = getColumnFamilyDescriptor();
- HFileContext hFileContext = new HFileContextBuilder()
- .withIncludesMvcc(includeMVCCReadpoint)
- .withIncludesTags(includesTag)
- .withCompression(compression)
- .withCompressTags(family.isCompressTags())
- .withChecksumType(StoreUtils.getChecksumType(conf))
- .withBytesPerCheckSum(StoreUtils.getBytesPerChecksum(conf))
- .withBlockSize(family.getBlocksize())
- .withHBaseCheckSum(true)
- .withDataBlockEncoding(family.getDataBlockEncoding())
- .withEncryptionContext(encryptionContext)
- .withCreateTime(EnvironmentEdgeManager.currentTime())
- .withColumnFamily(getColumnFamilyDescriptor().getName())
- .withTableName(getTableName().getName())
- .withCellComparator(getComparator())
- .build();
- return hFileContext;
+ return storeEngine.createFileContext(compression, includeMVCCReadpoint, includesTag,
+ encryptionContext, getColumnFamilyDescriptor(), getTableName(), getComparator(), conf);
}
private long getTotalSize(Collection sfs) {
@@ -1508,7 +1485,8 @@ protected List doCompaction(CompactionRequestImpl cr,
List newFiles) throws IOException {
// Do the steps necessary to complete the compaction.
setStoragePolicyFromFileName(newFiles);
- List sfs = moveCompactedFilesIntoPlace(cr, newFiles, user);
+ List sfs = this.storeEngine.compactor.commitCompaction(cr, newFiles, user,
+ p -> createStoreFileAndReader(p));
writeCompactionWalRecord(filesToCompact, sfs);
replaceStoreFiles(filesToCompact, sfs);
if (cr.isMajor()) {
@@ -1549,21 +1527,6 @@ private void setStoragePolicyFromFileName(List newFiles) throws IOExceptio
}
}
- private List moveCompactedFilesIntoPlace(CompactionRequestImpl cr,
- List newFiles, User user) throws IOException {
- List sfs = new ArrayList<>(newFiles.size());
- for (Path newFile : newFiles) {
- assert newFile != null;
- HStoreFile sf = moveFileIntoPlace(newFile);
- if (this.getCoprocessorHost() != null) {
- getCoprocessorHost().postCompact(this, sf, cr.getTracker(), cr, user);
- }
- assert sf != null;
- sfs.add(sf);
- }
- return sfs;
- }
-
// Package-visible for tests
HStoreFile moveFileIntoPlace(Path newFile) throws IOException {
validateStoreFile(newFile);
@@ -2014,7 +1977,7 @@ protected void finishCompactionRequest(CompactionRequestImpl cr) {
* operation.
* @param path the path to the store file
*/
- private void validateStoreFile(Path path) throws IOException {
+ public void validateStoreFile(Path path) throws IOException {
HStoreFile storeFile = null;
try {
storeFile = createStoreFileAndReader(path);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreEngine.java
index 60b3c3d0d20f..0cf86b170cfe 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreEngine.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreEngine.java
@@ -24,9 +24,17 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CellComparator;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
+import org.apache.hadoop.hbase.io.compress.Compression;
+import org.apache.hadoop.hbase.io.crypto.Encryption;
+import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.io.hfile.HFileContext;
+import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionPolicy;
import org.apache.hadoop.hbase.regionserver.compactions.Compactor;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.yetus.audience.InterfaceAudience;
@@ -128,4 +136,46 @@ private void createComponentsOnce(
throw new IOException("Unable to load configured store engine '" + className + "'", e);
}
}
+
+ /**
+ * Constructs an HFileContext instance for the given store.
+ * @param compression the Compression.Algorithm to be used.
+ * @param includeMVCCReadpoint whether MVCC read point is to be used.
+ * @param includesTag wheter Tags should be used.
+ * @param encryptionContext the Encryption.Context to be used.
+ * @param familyDescriptor ColumnFamilyDescriptor with info about the column family.
+ * @param tableName the table name this store belongs to.
+ * @param cellComparator the CellComparator to be used.
+ * @param conf the cluster configuration.
+ * @return an HFileContext instance.
+ */
+ public HFileContext createFileContext(Compression.Algorithm compression,
+ boolean includeMVCCReadpoint, boolean includesTag,
+ Encryption.Context encryptionContext,
+ ColumnFamilyDescriptor familyDescriptor,
+ TableName tableName,
+ CellComparator cellComparator,
+ Configuration conf) {
+ if (compression == null) {
+ compression = HFile.DEFAULT_COMPRESSION_ALGORITHM;
+ }
+ ColumnFamilyDescriptor family = familyDescriptor;
+ HFileContext hFileContext = new HFileContextBuilder()
+ .withIncludesMvcc(includeMVCCReadpoint)
+ .withIncludesTags(includesTag)
+ .withCompression(compression)
+ .withCompressTags(family.isCompressTags())
+ .withChecksumType(StoreUtils.getChecksumType(conf))
+ .withBytesPerCheckSum(StoreUtils.getBytesPerChecksum(conf))
+ .withBlockSize(family.getBlocksize())
+ .withHBaseCheckSum(true)
+ .withDataBlockEncoding(family.getDataBlockEncoding())
+ .withEncryptionContext(encryptionContext)
+ .withCreateTime(EnvironmentEdgeManager.currentTime())
+ .withColumnFamily(family.getName())
+ .withTableName(tableName.getName())
+ .withCellComparator(cellComparator)
+ .build();
+ return hFileContext;
+ }
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
index 6483111c6f62..519f59a65cd5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
@@ -28,6 +28,7 @@
import java.util.Collection;
import java.util.List;
import java.util.Map;
+import java.util.function.Function;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
@@ -136,6 +137,12 @@ protected static class FileDetails {
public long minSeqIdToKeep = 0;
/** Total size of the compacted files **/
private long totalCompactedFilesSize = 0;
+
+ public long getTotalCompactedFilesSize() {
+ return totalCompactedFilesSize;
+ }
+
+
}
/**
@@ -266,7 +273,9 @@ public InternalScanner createScanner(ScanInfo scanInfo, List s
* @param fd The file details.
* @return Writer for a new StoreFile in the tmp dir.
* @throws IOException if creation failed
+ * @deprecated Use initWriter instead.
*/
+ @Deprecated
protected final StoreFileWriter createTmpWriter(FileDetails fd, boolean shouldDropBehind, boolean major)
throws IOException {
// When all MVCC readpoints are 0, don't write them.
@@ -278,6 +287,21 @@ protected final StoreFileWriter createTmpWriter(FileDetails fd, boolean shouldDr
HConstants.EMPTY_STRING);
}
+ /**
+ * Default method for initializing a StoreFileWriter in the compaction process, this creates the
+ * resulting files on a temp directory. Therefore, upon compaction commit time, these files
+ * should be renamed into the actual store dir.
+ * @param fd the file details.
+ * @param shouldDropBehind boolean for the drop-behind output stream cache settings.
+ * @param major if compaction is major.
+ * @return Writer for a new StoreFile in the tmp dir.
+ * @throws IOException if it fails to initialise the writer.
+ */
+ protected StoreFileWriter initWriter(FileDetails fd, boolean shouldDropBehind, boolean major)
+ throws IOException {
+ return createTmpWriter(fd, shouldDropBehind, major);
+ }
+
protected final StoreFileWriter createTmpWriter(FileDetails fd, boolean shouldDropBehind,
String fileStoragePolicy, boolean major) throws IOException {
return store.createWriterInTmp(fd.maxKeyCount,
@@ -533,4 +557,57 @@ protected InternalScanner createScanner(HStore store, ScanInfo scanInfo,
return new StoreScanner(store, scanInfo, scanners, smallestReadPoint, earliestPutTs,
dropDeletesFromRow, dropDeletesToRow);
}
+
+ /**
+ * Default implementation for committing store files created after a compaction. Assumes new files
+ * had been created on a temp directory, so it renames those files into the actual store dir,
+ * then create a reader and cache it into the store.
+ * @param cr the compaction request.
+ * @param newFiles the new files created by this compaction under a temp dir.
+ * @param user the running user.
+ * @param fileProvider a lambda expression with logic for loading a HStoreFile given a Path.
+ * @return A list of the resulting store files already placed in the store dir and loaded into the
+ * store cache.
+ * @throws IOException if the commit fails.
+ */
+ public List commitCompaction(CompactionRequestImpl cr, List newFiles,
+ User user, StoreFileProvider fileProvider) throws IOException {
+ List sfs = new ArrayList<>(newFiles.size());
+ for (Path newFile : newFiles) {
+ assert newFile != null;
+ this.store.validateStoreFile(newFile);
+ // Move the file into the right spot
+ HStoreFile sf = createFileInStoreDir(newFile, fileProvider);
+ if (this.store.getCoprocessorHost() != null) {
+ this.store.getCoprocessorHost().postCompact(this.store, sf, cr.getTracker(), cr, user);
+ }
+ assert sf != null;
+ sfs.add(sf);
+ }
+ return sfs;
+ }
+
+ /**
+ * Assumes new file was created initially on a temp folder.
+ * Moves the new file from temp to the actual store directory, then create the related
+ * HStoreFile instance
+ * @param newFile the new file created.
+ * @param fileProvider a lambda expression with logic for creating a HStoreFile given a Path.
+ * @return an HStoreFile instance.
+ * @throws IOException if the file store creation fails.
+ */
+ protected HStoreFile createFileInStoreDir(Path newFile, StoreFileProvider fileProvider)
+ throws IOException {
+ Path destPath = this.store.getRegionFileSystem().
+ commitStoreFile(this.store.getColumnFamilyName(), newFile);
+ return fileProvider.createFile(destPath);
+ }
+
+ /**
+ * Functional interface to allow callers provide the logic specific for creating StoreFiles.
+ */
+ @FunctionalInterface
+ public interface StoreFileProvider {
+ HStoreFile createFile(Path path) throws IOException;
+ }
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java
index 839d067ecd14..ac228a1b4532 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java
@@ -48,14 +48,13 @@ public DefaultCompactor(Configuration conf, HStore store) {
}
private final CellSinkFactory writerFactory =
- new CellSinkFactory() {
- @Override
- public StoreFileWriter createWriter(InternalScanner scanner,
- org.apache.hadoop.hbase.regionserver.compactions.Compactor.FileDetails fd,
- boolean shouldDropBehind, boolean major) throws IOException {
- return createTmpWriter(fd, shouldDropBehind, major);
- }
- };
+ new CellSinkFactory() {
+ @Override
+ public StoreFileWriter createWriter(InternalScanner scanner, FileDetails fd,
+ boolean shouldDropBehind, boolean major) throws IOException {
+ return initWriter(fd, shouldDropBehind, major);
+ }
+ };
/**
* Do a minor/major compaction on an explicit set of storefiles from a Store.
@@ -107,4 +106,5 @@ protected void abortWriter(StoreFileWriter writer) throws IOException {
e);
}
}
+
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DirectStoreCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DirectStoreCompactor.java
new file mode 100644
index 000000000000..bbaaade8a4a8
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DirectStoreCompactor.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver.compactions;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.function.Function;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.io.compress.Compression;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.io.hfile.HFileContext;
+import org.apache.hadoop.hbase.regionserver.HStore;
+import org.apache.hadoop.hbase.regionserver.HStoreFile;
+import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * Alternative Compactor implementation that writes compacted files straight
+ * into the store directory.
+ *
+ * This class extends DefaultCompactor class,
+ * modifying original behaviour of initWriter and createFileInStoreDir
+ * methods to create compacted files in the final store directory, rather than temp, avoid the
+ * need to perform renames at compaction commit time.
+ */
+@InterfaceAudience.Private
+public class DirectStoreCompactor extends DefaultCompactor {
+ public DirectStoreCompactor(Configuration conf, HStore store) {
+ super(conf, store);
+ }
+
+ /**
+ * Overrides Compactor original implementation to create the resulting file directly
+ * in the store dir, rather than temp, in order to avoid the need for rename at commit time.
+ * @param fd the file details.
+ * @param shouldDropBehind boolean for the drop-behind output stream cache settings.
+ * @param major if compaction is major.
+ * @return an instance of StoreFileWriter for the given file details.
+ * @throws IOException if any error occurs.
+ */
+ @Override
+ protected StoreFileWriter initWriter(FileDetails fd, boolean shouldDropBehind, boolean major)
+ throws IOException {
+ // When all MVCC readpoints are 0, don't write them.
+ // See HBASE-8166, HBASE-12600, and HBASE-13389.
+ return createWriterInStoreDir(fd.maxKeyCount,
+ major ? majorCompactionCompression : minorCompactionCompression,
+ fd.maxMVCCReadpoint > 0, fd.maxTagsLength > 0,
+ shouldDropBehind, fd.getTotalCompactedFilesSize());
+ }
+
+ private StoreFileWriter createWriterInStoreDir(long maxKeyCount,
+ Compression.Algorithm compression, boolean includeMVCCReadpoint, boolean includesTag,
+ boolean shouldDropBehind, long totalCompactedFilesSize) throws IOException {
+ final CacheConfig writerCacheConf = new CacheConfig(store.getCacheConfig());
+ writerCacheConf.enableCacheOnWriteForCompactions(totalCompactedFilesSize);
+ InetSocketAddress[] favoredNodes = store.getStoreContext().getFavoredNodes();
+ HFileContext hFileContext = store.getStoreEngine().
+ createFileContext(compression, includeMVCCReadpoint,includesTag,
+ store.getStoreContext().getEncryptionContext(), store.getColumnFamilyDescriptor(),
+ store.getTableName(), store.getComparator(), conf);
+ Path familyDir = new Path(store.getRegionFileSystem().getRegionDir(),
+ store.getColumnFamilyDescriptor().getNameAsString());
+ StoreFileWriter.Builder builder = new StoreFileWriter.Builder(conf, writerCacheConf,
+ store.getFileSystem())
+ .withOutputDir(familyDir)
+ .withBloomType(store.getColumnFamilyDescriptor().getBloomFilterType())
+ .withMaxKeyCount(maxKeyCount)
+ .withFavoredNodes(favoredNodes)
+ .withFileContext(hFileContext)
+ .withShouldDropCacheBehind(shouldDropBehind)
+ .withCompactedFilesSupplier(() -> store.getCompactedFiles());
+ return builder.build();
+ }
+
+ /**
+ * Overrides Compactor original implementation, assuming the passed file is already in the store
+ * directory, thus it only creates the related HStoreFile for the passed Path.
+ * @param newFile the new file created.
+ * @param fileProvider a lambda expression with logic for loading a HStoreFile given a Path.
+ * @return HStoreFile reference for the newly created file.
+ * @throws IOException if any error occurs.
+ */
+ @Override
+ protected HStoreFile createFileInStoreDir(Path newFile, StoreFileProvider fileProvider)
+ throws IOException {
+ return fileProvider.createFile(newFile);
+ }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPersistedStoreFlushContext.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDirectStoreFlushContext.java
similarity index 90%
rename from hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPersistedStoreFlushContext.java
rename to hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDirectStoreFlushContext.java
index 298116e69e9e..b4b22263d3d3 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPersistedStoreFlushContext.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDirectStoreFlushContext.java
@@ -26,7 +26,6 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
@@ -38,14 +37,14 @@
import org.mockito.ArgumentCaptor;
/**
- * Test class for the TestPersistedStoreFlushContext
+ * Test class for DirectStoreFlushContext
*/
@Category({ RegionServerTests.class, MediumTests.class })
-public class TestPersistedStoreFlushContext {
+public class TestDirectStoreFlushContext {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
- HBaseClassTestRule.forClass(TestPersistedStoreFlushContext.class);
+ HBaseClassTestRule.forClass(TestDirectStoreFlushContext.class);
@Rule
public TestName name = new TestName();
@@ -63,7 +62,7 @@ public void testCommit() throws Exception {
HStoreFile mockStoreFile = mock(HStoreFile.class);
when(mockStoreFile.getReader()).thenReturn(mock(StoreFileReader.class));
when(mockStore.createStoreFileAndReader(captor.capture())).thenReturn(mockStoreFile);
- PersistedStoreFlushContext context = new PersistedStoreFlushContext(mockStore,
+ DirectStoreFlushContext context = new DirectStoreFlushContext(mockStore,
0L, FlushLifeCycleTracker.DUMMY);
context.tempFiles = new ArrayList<>();
context.tempFiles.add(filePath);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPersistedEngineStoreFlusher.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDirectStoreFlusher.java
similarity index 95%
rename from hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPersistedEngineStoreFlusher.java
rename to hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDirectStoreFlusher.java
index e1fb6d04dd55..0d7cb789d5bf 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPersistedEngineStoreFlusher.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDirectStoreFlusher.java
@@ -53,14 +53,14 @@
import org.junit.rules.TestName;
/**
- * Test class for the TestPersistedStoreFlushContext
+ * Test class for DirectStoreFlusher
*/
@Category({ RegionServerTests.class, MediumTests.class })
-public class TestPersistedEngineStoreFlusher {
+public class TestDirectStoreFlusher {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
- HBaseClassTestRule.forClass(TestPersistedEngineStoreFlusher.class);
+ HBaseClassTestRule.forClass(TestDirectStoreFlusher.class);
@Rule
public TestName name = new TestName();
@@ -115,7 +115,7 @@ public void setup() throws Exception {
@Test
public void testCreateWriter() throws Exception {
- PersistedEngineStoreFlusher flusher = new PersistedEngineStoreFlusher(config, mockStore);
+ DirectStoreFlusher flusher = new DirectStoreFlusher(config, mockStore);
List files = flusher.flushSnapshot(mockSnapshot, 0, mock(MonitoredTask.class),
null, FlushLifeCycleTracker.DUMMY);
assertEquals(1, files.size());
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java
index 20d7a29898ac..4c586863f3d5 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.regionserver;
+import static org.apache.hadoop.hbase.regionserver.DefaultStoreEngine.DEFAULT_COMPACTOR_CLASS_KEY;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -49,6 +50,8 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
@@ -92,6 +95,7 @@
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.quotas.RegionSizeStoreImpl;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;
import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher;
import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
@@ -1731,6 +1735,34 @@ public void testHFileContextSetWithCFAndTable() throws Exception {
assertArrayEquals(table, hFileContext.getTableName());
}
+ @Test
+ public void testDoCompactionDelegatesCommit() throws Exception {
+ final Configuration conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
+ conf.set(DEFAULT_COMPACTOR_CLASS_KEY, DummyCompactor.class.getName());
+ HFileContext fileContext = new HFileContextBuilder().withBlockSize(BLOCKSIZE_SMALL).build();
+ store = init(this.name.getMethodName(), conf);
+ Path storeDir =this.store.getStoreContext().getRegionFileSystem().getStoreDir("family");
+ StoreFileWriter w = new StoreFileWriter.Builder(conf, new CacheConfig(conf),
+ store.getFileSystem())
+ .withOutputDir(storeDir)
+ .withFileContext(fileContext)
+ .build();
+ w.appendMetadata(1, false);
+ w.close();
+ HStoreFile mockStoreFile = mock(HStoreFile.class);
+ List mockFiles = new ArrayList<>();
+ mockFiles.add(mockStoreFile);
+ List files = new ArrayList<>();
+ files.add(w.getPath());
+ DummyCompactor.countDownLatch = new CountDownLatch(1);
+ try {
+ store.doCompaction(mock(CompactionRequestImpl.class),
+ null, mock(User.class), 0, files);
+ fail();
+ } catch(Exception e){}
+ assertEquals(0, DummyCompactor.countDownLatch.getCount());
+ }
+
private HStoreFile mockStoreFileWithLength(long length) {
HStoreFile sf = mock(HStoreFile.class);
StoreFileReader sfr = mock(StoreFileReader.class);
@@ -1930,4 +1962,21 @@ public boolean add(T e) {
@Override
public List subList(int fromIndex, int toIndex) {return delegatee.subList(fromIndex, toIndex);}
}
+
+ private static class DummyCompactor extends DefaultCompactor {
+
+ static CountDownLatch countDownLatch;
+
+ public DummyCompactor(Configuration conf, HStore store) {
+ super(conf, store);
+ }
+
+ @Override
+ public List commitCompaction(CompactionRequestImpl cr,
+ List newFiles, User user, StoreFileProvider fileProvider){
+ countDownLatch.countDown();
+ return null;
+ }
+
+ }
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestDefaultCompactor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestDefaultCompactor.java
new file mode 100644
index 000000000000..847d82693376
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestDefaultCompactor.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.compactions;
+
+import static junit.framework.TestCase.assertEquals;
+import static org.mockito.Mockito.mock;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.regionserver.HStore;
+import org.apache.hadoop.hbase.regionserver.HStoreFile;
+import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+/**
+ * Test class for DefaultCompactor.
+ */
+@Category({ RegionServerTests.class, MediumTests.class })
+public class TestDefaultCompactor {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestDefaultCompactor.class);
+
+ @Rule
+ public TestName name = new TestName();
+
+ private final Configuration config = new Configuration();
+ private final String cfName = "cf";
+
+ private HBaseTestingUtility UTIL = new HBaseTestingUtility();
+ private TableName table;
+
+ @Before
+ public void setup() throws Exception {
+ UTIL.startMiniCluster();
+ table = TableName.valueOf(name.getMethodName());
+ UTIL.createTable(table, Bytes.toBytes(cfName));
+
+ }
+
+ @After
+ public void shutdown() throws IOException {
+ UTIL.shutdownMiniCluster();
+ }
+
+ @Test
+ public void testInitWriter() throws Exception {
+ HStore store = UTIL.getMiniHBaseCluster().getRegionServer(0).
+ getRegions(table).get(0).getStores().get(0);
+ DefaultCompactor compactor = new DefaultCompactor(config, store);
+ Compactor.FileDetails mockFileDetails = mock(Compactor.FileDetails.class);
+ StoreFileWriter writer = compactor.initWriter(mockFileDetails, false, false);
+ Path tmpPath = new Path(store.getRegionFileSystem().getRegionDir(), ".tmp");
+ assertEquals(new Path(tmpPath, cfName), writer.getPath().getParent());
+ }
+
+ @Test
+ public void testCommitCompaction() throws Exception {
+ //Performs a put, then flush to create a valid store file
+ Table tbl = UTIL.getConnection().getTable(table);
+ Put put = new Put(Bytes.toBytes("row1"));
+ put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("1"), Bytes.toBytes("v1"));
+ tbl.put(put);
+ UTIL.flush(table);
+ HStore store = UTIL.getMiniHBaseCluster().getRegionServer(0).
+ getRegions(table).get(0).getStores().get(0);
+ //Will move the existing file into a tmp folder,
+ // so that we can use it later as parameter for Compactor.commitCompaction
+ Path filePath = null;
+ List tmpFilesList = new ArrayList<>();
+ final HStoreFile file = (HStoreFile)store.getStorefiles().toArray()[0];
+ filePath = file.getPath();
+ Path tmpPath = new Path(store.getRegionFileSystem().getRegionDir(), ".tmp");
+ tmpPath = new Path(tmpPath, filePath.getName());
+ store.getFileSystem().rename(filePath, tmpPath);
+ tmpFilesList.add(tmpPath);
+ DefaultCompactor compactor = new DefaultCompactor(config, store);
+ //pass the renamed original file, then asserts it has the proper store dir path
+ List result = compactor.commitCompaction(mock(CompactionRequestImpl.class),
+ tmpFilesList, null, p -> file);
+ assertEquals(1, result.size());
+ assertEquals(filePath, result.get(0).getPath());
+ }
+
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestDirectStoreCompactor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestDirectStoreCompactor.java
new file mode 100644
index 000000000000..0481b3299a48
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestDirectStoreCompactor.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.compactions;
+
+import static junit.framework.TestCase.assertEquals;
+import static org.mockito.Mockito.mock;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.regionserver.HStore;
+import org.apache.hadoop.hbase.regionserver.HStoreFile;
+import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+
+/**
+ * Test class for DirectStoreCompactor.
+ */
+@Category({ RegionServerTests.class, MediumTests.class })
+public class TestDirectStoreCompactor {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestDirectStoreCompactor.class);
+
+ @Rule
+ public TestName name = new TestName();
+
+ private final Configuration config = new Configuration();
+ private final String cfName = "cf";
+
+ private HBaseTestingUtility UTIL = new HBaseTestingUtility();
+ private TableName table;
+
+ @Before
+ public void setup() throws Exception {
+ UTIL.startMiniCluster();
+ table = TableName.valueOf(name.getMethodName());
+ UTIL.createTable(table, Bytes.toBytes(cfName));
+
+ }
+
+ @After
+ public void shutdown() throws IOException {
+ UTIL.shutdownMiniCluster();
+ }
+
+ @Test
+ public void testInitWriter() throws Exception {
+ HStore store = UTIL.getMiniHBaseCluster().getRegionServer(0).
+ getRegions(table).get(0).getStores().get(0);
+ DirectStoreCompactor compactor = new DirectStoreCompactor(config, store);
+ Compactor.FileDetails mockFileDetails = mock(Compactor.FileDetails.class);
+ StoreFileWriter writer = compactor.initWriter(mockFileDetails, false, false);
+ //asserts the parent dir is the family dir itself, not .tmp
+ assertEquals(cfName, writer.getPath().getParent().getName());
+ }
+
+ @Test
+ public void testCreateFileInStoreDir() throws Exception {
+ HStoreFile mockFile = mock(HStoreFile.class);
+ final StringBuilder builder = new StringBuilder();
+ HStore store = UTIL.getMiniHBaseCluster().getRegionServer(0).
+ getRegions(table).get(0).getStores().get(0);
+ DirectStoreCompactor compactor = new DirectStoreCompactor(config, store);
+ Compactor.FileDetails mockFileDetails = mock(Compactor.FileDetails.class);
+ StoreFileWriter writer = compactor.initWriter(mockFileDetails, false, false);
+ compactor.createFileInStoreDir(writer.getPath(), p -> {
+ builder.append(p.getParent().getName());
+ return mockFile;
+ });
+ assertEquals(writer.getPath().getParent().getName(), builder.toString());
+ }
+}