Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,35 @@ public void enableCacheOnWrite() {
this.cacheBloomsOnWrite = true;
}

/**
* Checks if write cache should be enabled for compactions.
*
* To be called only in the context of compactions,
* for flushes or other write operations, use <code>enableCacheOnWrite</code>.
*
* If hbase.rs.cachecompactedblocksonwrite configuration is set to true and
* 'totalCompactedFilesSize' is lower than 'cacheCompactedDataOnWriteThreshold',
* enables cache on write for below properties:
* - cacheDataOnWrite
* - cacheIndexesOnWrite
* - cacheBloomsOnWrite
*
* Otherwise, sets 'cacheDataOnWrite' only to false.
*
* @param totalCompactedFilesSize the total size of compacted files.
* @return true if the checks mentioned above pass and the cache is enabled, false otherwise.
*/
public boolean enableCacheOnWriteForCompactions(long totalCompactedFilesSize) {
if (shouldCacheCompactedBlocksOnWrite() && totalCompactedFilesSize <=
getCacheCompactedBlocksOnWriteThreshold()) {
enableCacheOnWrite();
return true;
} else {
setCacheDataOnWrite(false);
return false;
}
}

/**
* @return true if index blocks should be written to the cache when an HFile
* is written, false if not
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,14 @@
* directly in the store dir, and therefore, doesn't perform a rename from tmp dir
* into the store dir.
*
* To be used only when PersistedStoreEngine is configured as the StoreEngine implementation.
* To be used only when DirectStoreFlushContext is configured as the StoreEngine implementation.
*/
@InterfaceAudience.Private
public class PersistedStoreFlushContext extends DefaultStoreFlushContext {
public class DirectStoreFlushContext extends DefaultStoreFlushContext {

private static final Logger LOG = LoggerFactory.getLogger(PersistedStoreFlushContext.class);
private static final Logger LOG = LoggerFactory.getLogger(DirectStoreFlushContext.class);

public PersistedStoreFlushContext(HStore store, Long cacheFlushSeqNum,
public DirectStoreFlushContext(HStore store, Long cacheFlushSeqNum,
FlushLifeCycleTracker tracker) {
super.init(store, cacheFlushSeqNum, tracker);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,12 @@

/**
* A StoreFlusher that writes hfiles directly into the actual store directory,
* instead of a temp dir..
* instead of a temp dir.
*/
@InterfaceAudience.Private
public class PersistedEngineStoreFlusher extends DefaultStoreFlusher {
public class DirectStoreFlusher extends DefaultStoreFlusher {

public PersistedEngineStoreFlusher(Configuration conf, HStore store) {
public DirectStoreFlusher(Configuration conf, HStore store) {
super(conf, store);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileContext;
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl;
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
Expand Down Expand Up @@ -431,7 +430,7 @@ public static long determineTTLFromFamily(final ColumnFamilyDescriptor family) {
return ttl;
}

StoreContext getStoreContext() {
public StoreContext getStoreContext() {
return storeContext;
}

Expand Down Expand Up @@ -694,7 +693,7 @@ private void refreshStoreFilesInternal(Collection<StoreFileInfo> newFiles) throw
refreshStoreSizeAndTotalBytes();
}

protected HStoreFile createStoreFileAndReader(final Path p) throws IOException {
HStoreFile createStoreFileAndReader(final Path p) throws IOException {
StoreFileInfo info = new StoreFileInfo(conf, this.getFileSystem(),
p, isPrimaryReplicaStore());
return createStoreFileAndReader(info);
Expand Down Expand Up @@ -1145,16 +1144,13 @@ public StoreFileWriter createWriterInTmp(long maxKeyCount, Compression.Algorithm
// if data blocks are to be cached on write
// during compaction, we should forcefully
// cache index and bloom blocks as well
if (cacheCompactedBlocksOnWrite && totalCompactedFilesSize <= cacheConf
.getCacheCompactedBlocksOnWriteThreshold()) {
writerCacheConf.enableCacheOnWrite();
if (writerCacheConf.enableCacheOnWriteForCompactions(totalCompactedFilesSize)) {
if (!cacheOnWriteLogged) {
LOG.info("For {} , cacheCompactedBlocksOnWrite is true, hence enabled " +
"cacheOnWrite for Data blocks, Index blocks and Bloom filter blocks", this);
cacheOnWriteLogged = true;
}
} else {
writerCacheConf.setCacheDataOnWrite(false);
if (totalCompactedFilesSize > cacheConf.getCacheCompactedBlocksOnWriteThreshold()) {
// checking condition once again for logging
LOG.debug(
Expand Down Expand Up @@ -1194,27 +1190,8 @@ public StoreFileWriter createWriterInTmp(long maxKeyCount, Compression.Algorithm

HFileContext createFileContext(Compression.Algorithm compression,
boolean includeMVCCReadpoint, boolean includesTag, Encryption.Context encryptionContext) {
if (compression == null) {
compression = HFile.DEFAULT_COMPRESSION_ALGORITHM;
}
ColumnFamilyDescriptor family = getColumnFamilyDescriptor();
HFileContext hFileContext = new HFileContextBuilder()
.withIncludesMvcc(includeMVCCReadpoint)
.withIncludesTags(includesTag)
.withCompression(compression)
.withCompressTags(family.isCompressTags())
.withChecksumType(StoreUtils.getChecksumType(conf))
.withBytesPerCheckSum(StoreUtils.getBytesPerChecksum(conf))
.withBlockSize(family.getBlocksize())
.withHBaseCheckSum(true)
.withDataBlockEncoding(family.getDataBlockEncoding())
.withEncryptionContext(encryptionContext)
.withCreateTime(EnvironmentEdgeManager.currentTime())
.withColumnFamily(getColumnFamilyDescriptor().getName())
.withTableName(getTableName().getName())
.withCellComparator(getComparator())
.build();
return hFileContext;
return storeEngine.createFileContext(compression, includeMVCCReadpoint, includesTag,
encryptionContext, getColumnFamilyDescriptor(), getTableName(), getComparator(), conf);
}

private long getTotalSize(Collection<HStoreFile> sfs) {
Expand Down Expand Up @@ -1508,7 +1485,8 @@ protected List<HStoreFile> doCompaction(CompactionRequestImpl cr,
List<Path> newFiles) throws IOException {
// Do the steps necessary to complete the compaction.
setStoragePolicyFromFileName(newFiles);
List<HStoreFile> sfs = moveCompactedFilesIntoPlace(cr, newFiles, user);
List<HStoreFile> sfs = this.storeEngine.compactor.commitCompaction(cr, newFiles, user,
p -> createStoreFileAndReader(p));
writeCompactionWalRecord(filesToCompact, sfs);
replaceStoreFiles(filesToCompact, sfs);
if (cr.isMajor()) {
Expand Down Expand Up @@ -1549,21 +1527,6 @@ private void setStoragePolicyFromFileName(List<Path> newFiles) throws IOExceptio
}
}

private List<HStoreFile> moveCompactedFilesIntoPlace(CompactionRequestImpl cr,
List<Path> newFiles, User user) throws IOException {
List<HStoreFile> sfs = new ArrayList<>(newFiles.size());
for (Path newFile : newFiles) {
assert newFile != null;
HStoreFile sf = moveFileIntoPlace(newFile);
if (this.getCoprocessorHost() != null) {
getCoprocessorHost().postCompact(this, sf, cr.getTracker(), cr, user);
}
assert sf != null;
sfs.add(sf);
}
return sfs;
}

// Package-visible for tests
HStoreFile moveFileIntoPlace(Path newFile) throws IOException {
validateStoreFile(newFile);
Expand Down Expand Up @@ -2014,7 +1977,7 @@ protected void finishCompactionRequest(CompactionRequestImpl cr) {
* operation.
* @param path the path to the store file
*/
private void validateStoreFile(Path path) throws IOException {
public void validateStoreFile(Path path) throws IOException {
HStoreFile storeFile = null;
try {
storeFile = createStoreFileAndReader(path);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,17 @@

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.crypto.Encryption;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileContext;
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionPolicy;
import org.apache.hadoop.hbase.regionserver.compactions.Compactor;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.yetus.audience.InterfaceAudience;

Expand Down Expand Up @@ -128,4 +136,46 @@ private void createComponentsOnce(
throw new IOException("Unable to load configured store engine '" + className + "'", e);
}
}

/**
* Constructs an <code>HFileContext</code> instance for the given store.
* @param compression the Compression.Algorithm to be used.
* @param includeMVCCReadpoint whether MVCC read point is to be used.
* @param includesTag wheter Tags should be used.
* @param encryptionContext the Encryption.Context to be used.
* @param familyDescriptor ColumnFamilyDescriptor with info about the column family.
* @param tableName the table name this store belongs to.
* @param cellComparator the CellComparator to be used.
* @param conf the cluster configuration.
* @return an HFileContext instance.
*/
public HFileContext createFileContext(Compression.Algorithm compression,
boolean includeMVCCReadpoint, boolean includesTag,
Encryption.Context encryptionContext,
ColumnFamilyDescriptor familyDescriptor,
TableName tableName,
CellComparator cellComparator,
Configuration conf) {
if (compression == null) {
compression = HFile.DEFAULT_COMPRESSION_ALGORITHM;
}
ColumnFamilyDescriptor family = familyDescriptor;
HFileContext hFileContext = new HFileContextBuilder()
.withIncludesMvcc(includeMVCCReadpoint)
.withIncludesTags(includesTag)
.withCompression(compression)
.withCompressTags(family.isCompressTags())
.withChecksumType(StoreUtils.getChecksumType(conf))
.withBytesPerCheckSum(StoreUtils.getBytesPerChecksum(conf))
.withBlockSize(family.getBlocksize())
.withHBaseCheckSum(true)
.withDataBlockEncoding(family.getDataBlockEncoding())
.withEncryptionContext(encryptionContext)
.withCreateTime(EnvironmentEdgeManager.currentTime())
.withColumnFamily(family.getName())
.withTableName(tableName.getName())
.withCellComparator(cellComparator)
.build();
return hFileContext;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.function.Function;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
Expand Down Expand Up @@ -136,6 +137,12 @@ protected static class FileDetails {
public long minSeqIdToKeep = 0;
/** Total size of the compacted files **/
private long totalCompactedFilesSize = 0;

public long getTotalCompactedFilesSize() {
return totalCompactedFilesSize;
}


}

/**
Expand Down Expand Up @@ -266,7 +273,9 @@ public InternalScanner createScanner(ScanInfo scanInfo, List<StoreFileScanner> s
* @param fd The file details.
* @return Writer for a new StoreFile in the tmp dir.
* @throws IOException if creation failed
* @deprecated Use initWriter instead.
*/
@Deprecated
protected final StoreFileWriter createTmpWriter(FileDetails fd, boolean shouldDropBehind, boolean major)
throws IOException {
// When all MVCC readpoints are 0, don't write them.
Expand All @@ -278,6 +287,21 @@ protected final StoreFileWriter createTmpWriter(FileDetails fd, boolean shouldDr
HConstants.EMPTY_STRING);
}

/**
* Default method for initializing a StoreFileWriter in the compaction process, this creates the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this right if we have the StoreEngine in the mix? Its supposed to do this right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

StoreEngine delegates the work to a Compactor implementation, where DefaultCompator, the default Compactor implementation, uses this one to create the file writer. So that's why we say it's the "default" method. Alternatives compactors may not use it, like the DirectStoreCompactor that implements its own logic that creates the file on the store directory, rather than under temp.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is creating them in the temp directory really "default"? Or is it "default" just because the direct-insert stuff isn't available yet? ;)

When you say other compactors may not use it, it makes me wonder if it makes sense to provide it here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is creating them in the temp directory really "default"? Or is it "default" just because the direct-insert stuff isn't available yet? ;)

It is the default now, if we don't explicitly specify hbase.hstore.defaultengine.compactor.class config property to set DirectStoreCompactor implementation to be used, the previous existing DefaultCompactor will be loaded, which implements this logic.

When you say other compactors may not use it, it makes me wonder if it makes sense to provide it here.

What I meant is that other compactors may overwrite the method with a different logic.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I meant is that other compactors may overwrite the method with a different logic.

I'm just thinking about what other potential compaction algorithms may choose to do. I would expect that any future compactions would still want to leverage the "commit logic" here to avoid that rename. Just keeping tabs on "pre-optimization" without a specific use-case in mind. Although, as long as other compaction algorithms can extend DirectStoreCompactor and get the new shiny solution, it's not a concern.

* resulting files on a temp directory. Therefore, upon compaction commit time, these files
* should be renamed into the actual store dir.
* @param fd the file details.
* @param shouldDropBehind boolean for the drop-behind output stream cache settings.
* @param major if compaction is major.
* @return Writer for a new StoreFile in the tmp dir.
* @throws IOException if it fails to initialise the writer.
*/
protected StoreFileWriter initWriter(FileDetails fd, boolean shouldDropBehind, boolean major)
throws IOException {
return createTmpWriter(fd, shouldDropBehind, major);
}

protected final StoreFileWriter createTmpWriter(FileDetails fd, boolean shouldDropBehind,
String fileStoragePolicy, boolean major) throws IOException {
return store.createWriterInTmp(fd.maxKeyCount,
Expand Down Expand Up @@ -533,4 +557,57 @@ protected InternalScanner createScanner(HStore store, ScanInfo scanInfo,
return new StoreScanner(store, scanInfo, scanners, smallestReadPoint, earliestPutTs,
dropDeletesFromRow, dropDeletesToRow);
}

/**
* Default implementation for committing store files created after a compaction. Assumes new files
* had been created on a temp directory, so it renames those files into the actual store dir,
* then create a reader and cache it into the store.
* @param cr the compaction request.
* @param newFiles the new files created by this compaction under a temp dir.
* @param user the running user.
* @param fileProvider a lambda expression with logic for loading a HStoreFile given a Path.
* @return A list of the resulting store files already placed in the store dir and loaded into the
* store cache.
* @throws IOException if the commit fails.
*/
public List<HStoreFile> commitCompaction(CompactionRequestImpl cr, List<Path> newFiles,
User user, StoreFileProvider fileProvider) throws IOException {
List<HStoreFile> sfs = new ArrayList<>(newFiles.size());
for (Path newFile : newFiles) {
assert newFile != null;
this.store.validateStoreFile(newFile);
// Move the file into the right spot
HStoreFile sf = createFileInStoreDir(newFile, fileProvider);
if (this.store.getCoprocessorHost() != null) {
this.store.getCoprocessorHost().postCompact(this.store, sf, cr.getTracker(), cr, user);
}
assert sf != null;
sfs.add(sf);
}
return sfs;
}

/**
* Assumes new file was created initially on a temp folder.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we want the Compactor asking the StoreEngine to do the writing for us? Rather than doing it here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We still need to vary the logic of "where" the store file is originally created. Default behaviour is to create it under temp (that's done by the call from lines #591/#592), whilst DirectStoreCompactor assumes the passed path is already in the store dir itself, so it just needs to create the reader over the passed path.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same way above, I think it makes sense to not have the abstract Compactor dealing with where storefiles are put as a part of the compaction algorithm. That's an implementation detail of the algorithm itself. Can we reasonably push down where paths/files are stored into the implementation itself?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Originally, the logic of creating files in temp dir was already implemented in the abstract Compactor, not on DefaultCompactor. That's why I left it here. I can leave this method abstract here and move the implementation to DefaultCompactor, though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, when I tried to move it out of Compactor by making it abstract, it immediately broke StripeCompactor and DateTieredCompactor, which are relying in this logic, but not extending DefaultCompactor.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it immediately broke StripeCompactor and DateTieredCompactor, which are relying in this logic, but not extending DefaultCompactor.

Devil's advocate: do you think that's correct for StripeC and DTC to do?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I honestly had not looked further into these two. Am only worried to not break current logic used by those. Was planning to take care of those later on another PR, if we are to make it compatible with direct store/renameless writing.

* Moves the new file from temp to the actual store directory, then create the related
* HStoreFile instance
* @param newFile the new file created.
* @param fileProvider a lambda expression with logic for creating a HStoreFile given a Path.
* @return an HStoreFile instance.
* @throws IOException if the file store creation fails.
*/
protected HStoreFile createFileInStoreDir(Path newFile, StoreFileProvider fileProvider)
throws IOException {
Path destPath = this.store.getRegionFileSystem().
commitStoreFile(this.store.getColumnFamilyName(), newFile);
return fileProvider.createFile(destPath);
}

/**
* Functional interface to allow callers provide the logic specific for creating StoreFiles.
*/
@FunctionalInterface
public interface StoreFileProvider {
HStoreFile createFile(Path path) throws IOException;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,13 @@ public DefaultCompactor(Configuration conf, HStore store) {
}

private final CellSinkFactory<StoreFileWriter> writerFactory =
new CellSinkFactory<StoreFileWriter>() {
@Override
public StoreFileWriter createWriter(InternalScanner scanner,
org.apache.hadoop.hbase.regionserver.compactions.Compactor.FileDetails fd,
boolean shouldDropBehind, boolean major) throws IOException {
return createTmpWriter(fd, shouldDropBehind, major);
}
};
new CellSinkFactory<StoreFileWriter>() {
@Override
public StoreFileWriter createWriter(InternalScanner scanner, FileDetails fd,
boolean shouldDropBehind, boolean major) throws IOException {
return initWriter(fd, shouldDropBehind, major);
}
};

/**
* Do a minor/major compaction on an explicit set of storefiles from a Store.
Expand Down Expand Up @@ -107,4 +106,5 @@ protected void abortWriter(StoreFileWriter writer) throws IOException {
e);
}
}

}
Loading