From e84d7a9617e0c654bcb80d66243d3fe55873d8a9 Mon Sep 17 00:00:00 2001 From: alanzhao Date: Mon, 20 Mar 2023 13:52:58 +0800 Subject: [PATCH 1/9] HBASE-27733 hfile split occurs during bulkload, the new HFile file does not specify favored nodes --- .../hadoop/hbase/tool/BulkLoadHFilesTool.java | 171 +++++++++++++++++- .../hadoop/hbase/tool/TestBulkLoadHFiles.java | 94 ++++++++++ 2 files changed, 261 insertions(+), 4 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFilesTool.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFilesTool.java index e54de3403e7a..2a674702ea0a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFilesTool.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFilesTool.java @@ -22,6 +22,7 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.io.InterruptedIOException; +import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.util.ArrayDeque; import java.util.ArrayList; @@ -56,13 +57,17 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.client.AsyncAdmin; import org.apache.hadoop.hbase.client.AsyncClusterConnection; +import org.apache.hadoop.hbase.client.AsyncTableRegionLocator; import org.apache.hadoop.hbase.client.ClusterConnectionFactory; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; @@ -114,6 +119,13 @@ public class BulkLoadHFilesTool extends Configured implements BulkLoadHFiles, To private static final Logger LOG = LoggerFactory.getLogger(BulkLoadHFilesTool.class); + /** + * Keep locality while generating HFiles for bulkload. See HBASE-12596 + */ + public static final String LOCALITY_SENSITIVE_CONF_KEY = + "hbase.bulkload.locality.sensitive.enabled"; + private static final boolean DEFAULT_LOCALITY_SENSITIVE = true; + public static final String NAME = "completebulkload"; /** * Whether to run validation on hfiles before loading. @@ -540,7 +552,6 @@ private Pair, Set> groupOrSplitPhase Set, String>>> splittingFutures = new HashSet<>(); while (!queue.isEmpty()) { final LoadQueueItem item = queue.remove(); - final Callable, String>> call = () -> groupOrSplit(conn, tableName, regionGroups, item, startEndKeys); splittingFutures.add(pool.submit(call)); @@ -578,6 +589,50 @@ private String getUniqueName() { return UUID.randomUUID().toString().replaceAll("-", ""); } + private List splitStoreFile(AsyncTableRegionLocator loc, LoadQueueItem item, + TableDescriptor tableDesc, byte[] splitKey) throws IOException { + Path hfilePath = item.getFilePath(); + byte[] family = item.getFamily(); + Path tmpDir = hfilePath.getParent(); + if (!tmpDir.getName().equals(TMP_DIR)) { + tmpDir = new Path(tmpDir, TMP_DIR); + } + + LOG.info("HFile at " + hfilePath + " no longer fits inside a single " + "region. Splitting..."); + + String uniqueName = getUniqueName(); + ColumnFamilyDescriptor familyDesc = tableDesc.getColumnFamily(family); + + Path botOut = new Path(tmpDir, uniqueName + ".bottom"); + Path topOut = new Path(tmpDir, uniqueName + ".top"); + + splitStoreFile(loc, getConf(), hfilePath, familyDesc, splitKey, botOut, topOut); + + FileSystem fs = tmpDir.getFileSystem(getConf()); + fs.setPermission(tmpDir, FsPermission.valueOf("-rwxrwxrwx")); + fs.setPermission(botOut, FsPermission.valueOf("-rwxrwxrwx")); + fs.setPermission(topOut, FsPermission.valueOf("-rwxrwxrwx")); + + // Add these back at the *front* of the queue, so there's a lower + // chance that the region will just split again before we get there. + List lqis = new ArrayList<>(2); + lqis.add(new LoadQueueItem(family, botOut)); + lqis.add(new LoadQueueItem(family, topOut)); + + // If the current item is already the result of previous splits, + // we don't need it anymore. Clean up to save space. + // It is not part of the original input files. + try { + if (tmpDir.getName().equals(TMP_DIR)) { + fs.delete(hfilePath, false); + } + } catch (IOException e) { + LOG.warn("Unable to delete temporary split file " + hfilePath); + } + LOG.info("Successfully split into new HFiles " + botOut + " and " + topOut); + return lqis; + } + private List splitStoreFile(LoadQueueItem item, TableDescriptor tableDesc, byte[] splitKey) throws IOException { Path hfilePath = item.getFilePath(); @@ -718,8 +773,9 @@ CacheConfig.DISABLED, true, getConf())) { checkRegionIndexValid(splitIdx, startEndKeys, tableName); } byte[] splitPoint = startEndKeys.get(splitIdx).getSecond(); - List lqis = - splitStoreFile(item, FutureUtils.get(conn.getAdmin().getDescriptor(tableName)), splitPoint); + List lqis = splitStoreFile(conn.getRegionLocator(tableName), item, + FutureUtils.get(conn.getAdmin().getDescriptor(tableName)), splitPoint); + return new Pair<>(lqis, null); } @@ -730,7 +786,7 @@ CacheConfig.DISABLED, true, getConf())) { /** * Split a storefile into a top and bottom half, maintaining the metadata, recreating bloom - * filters, etc. + * filters, etc */ @InterfaceAudience.Private static void splitStoreFile(Configuration conf, Path inFile, ColumnFamilyDescriptor familyDesc, @@ -743,6 +799,22 @@ static void splitStoreFile(Configuration conf, Path inFile, ColumnFamilyDescript copyHFileHalf(conf, inFile, bottomOut, bottomReference, familyDesc); } + /** + * Split a storefile into a top and bottom half with favored nodes, maintaining the metadata, + * recreating bloom filters, etc. + */ + @InterfaceAudience.Private + static void splitStoreFile(AsyncTableRegionLocator loc, Configuration conf, Path inFile, + ColumnFamilyDescriptor familyDesc, byte[] splitKey, Path bottomOut, Path topOut) + throws IOException { + // Open reader with no block cache, and not in-memory + Reference topReference = Reference.createTopReference(splitKey); + Reference bottomReference = Reference.createBottomReference(splitKey); + + copyHFileHalf(conf, inFile, topOut, topReference, familyDesc, loc); + copyHFileHalf(conf, inFile, bottomOut, bottomReference, familyDesc, loc); + } + /** * Copy half of an HFile into a new HFile. */ @@ -796,6 +868,97 @@ private static void copyHFileHalf(Configuration conf, Path inFile, Path outFile, } } + /** + * Copy half of an HFile into a new HFile with favored nodes. + */ + private static void copyHFileHalf(Configuration conf, Path inFile, Path outFile, + Reference reference, ColumnFamilyDescriptor familyDescriptor, AsyncTableRegionLocator loc) + throws IOException { + FileSystem fs = inFile.getFileSystem(conf); + CacheConfig cacheConf = CacheConfig.DISABLED; + HalfStoreFileReader halfReader = null; + StoreFileWriter halfWriter = null; + try { + ReaderContext context = new ReaderContextBuilder().withFileSystemAndPath(fs, inFile).build(); + StoreFileInfo storeFileInfo = + new StoreFileInfo(conf, fs, fs.getFileStatus(inFile), reference); + storeFileInfo.initHFileInfo(context); + halfReader = (HalfStoreFileReader) storeFileInfo.createReader(context, cacheConf); + storeFileInfo.getHFileInfo().initMetaAndIndex(halfReader.getHFileReader()); + Map fileInfo = halfReader.loadFileInfo(); + + int blocksize = familyDescriptor.getBlocksize(); + Algorithm compression = familyDescriptor.getCompressionType(); + BloomType bloomFilterType = familyDescriptor.getBloomFilterType(); + HFileContext hFileContext = new HFileContextBuilder().withCompression(compression) + .withChecksumType(StoreUtils.getChecksumType(conf)) + .withBytesPerCheckSum(StoreUtils.getBytesPerChecksum(conf)).withBlockSize(blocksize) + .withDataBlockEncoding(familyDescriptor.getDataBlockEncoding()).withIncludesTags(true) + .build(); + + HFileScanner scanner = halfReader.getScanner(false, false, false); + scanner.seekTo(); + do { + final Cell cell = scanner.getCell(); + if (null != halfWriter) { + halfWriter.append(cell); + } else { + + // init halfwriter + if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) { + byte[] rowKey = CellUtil.cloneRow(cell); + HRegionLocation hRegionLocation = FutureUtils.get(loc.getRegionLocation(rowKey)); + InetSocketAddress[] favoredNodes = null; + if (null == hRegionLocation) { + LOG.trace("Failed get of location, use default writer {}", Bytes.toString(rowKey)); + } else { + LOG.debug("First rowkey: [{}]", Bytes.toString(rowKey)); + InetSocketAddress initialIsa = + new InetSocketAddress(hRegionLocation.getHostname(), hRegionLocation.getPort()); + if (initialIsa.isUnresolved()) { + LOG.trace("Failed resolve address {}, use default writer", + hRegionLocation.getHostnamePort()); + } else { + LOG.debug("Use favored nodes writer: {}", initialIsa.getHostString()); + favoredNodes = new InetSocketAddress[] { initialIsa }; + } + } + if (null == favoredNodes) { + halfWriter = new StoreFileWriter.Builder(conf, cacheConf, fs).withFilePath(outFile) + .withBloomType(bloomFilterType).withFileContext(hFileContext).build(); + } else { + halfWriter = new StoreFileWriter.Builder(conf, cacheConf, fs).withFilePath(outFile) + .withBloomType(bloomFilterType).withFileContext(hFileContext) + .withFavoredNodes(favoredNodes).build(); + } + } else { + halfWriter = new StoreFileWriter.Builder(conf, cacheConf, fs).withFilePath(outFile) + .withBloomType(bloomFilterType).withFileContext(hFileContext).build(); + } + halfWriter.append(cell); + } + + } while (scanner.next()); + + for (Map.Entry entry : fileInfo.entrySet()) { + if (shouldCopyHFileMetaKey(entry.getKey())) { + halfWriter.appendFileInfo(entry.getKey(), entry.getValue()); + } + } + } finally { + if (halfReader != null) { + try { + halfReader.close(cacheConf.shouldEvictOnClose()); + } catch (IOException e) { + LOG.warn("failed to close hfile reader for " + inFile, e); + } + } + if (halfWriter != null) { + halfWriter.close(); + } + } + } + /** * Infers region boundaries for a new table. *

diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestBulkLoadHFiles.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestBulkLoadHFiles.java index fecf4c7ec2c2..910162a2ac12 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestBulkLoadHFiles.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestBulkLoadHFiles.java @@ -26,6 +26,7 @@ import static org.junit.Assert.fail; import java.io.IOException; +import java.net.InetAddress; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collection; @@ -43,10 +44,12 @@ import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtil; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.NamespaceDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.client.AsyncClusterConnection; +import org.apache.hadoop.hbase.client.AsyncTableRegionLocator; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; import org.apache.hadoop.hbase.client.Table; @@ -63,7 +66,12 @@ import org.apache.hadoop.hbase.testclassification.MiscTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.CommonFSUtils; +import org.apache.hadoop.hbase.util.FutureUtils; import org.apache.hadoop.hbase.util.HFileTestUtil; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.hamcrest.MatcherAssert; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -569,6 +577,37 @@ public void testSplitStoreFile() throws IOException { assertEquals(1000, rowCount); } + /** + * Test hfile splits with the favored nodes + */ + @Test + public void testSplitStoreFileWithFavoriteNodes() throws IOException { + + Path dir = new Path(util.getDefaultRootDirPath(), "testhfile"); + FileSystem fs = util.getDFSCluster().getFileSystem(); + + // FileSystem fs = util.getTestFileSystem(); + Path testIn = new Path(dir, "testSplitStoreFileWithFavoriteNodes"); + ColumnFamilyDescriptor familyDesc = ColumnFamilyDescriptorBuilder.of(FAMILY); + String tableName = tn.getMethodName(); + Table table = util.createTable(TableName.valueOf(tableName), familyDesc.getNameAsString()); + HFileTestUtil.createHFile(util.getConfiguration(), fs, testIn, FAMILY, QUALIFIER, + Bytes.toBytes("aaa"), Bytes.toBytes("zzz"), 1000); + + Path bottomOut = new Path(dir, "bottom.out"); + Path topOut = new Path(dir, "top.out"); + + final AsyncTableRegionLocator regionLocator = + util.getAsyncConnection().getRegionLocator(TableName.valueOf(tableName)); + BulkLoadHFilesTool.splitStoreFile(regionLocator, util.getConfiguration(), testIn, familyDesc, + Bytes.toBytes("ggg"), bottomOut, topOut); + verifyHFileFavoriteNode(topOut, regionLocator, fs); + verifyHFileFavoriteNode(bottomOut, regionLocator, fs); + int rowCount = verifyHFile(bottomOut); + rowCount += verifyHFile(topOut); + assertEquals(1000, rowCount); + } + @Test public void testSplitStoreFileWithCreateTimeTS() throws IOException { Path dir = util.getDataTestDirOnTestFS("testSplitStoreFileWithCreateTimeTS"); @@ -654,6 +693,61 @@ private void verifyHFileCreateTimeTS(Path p) throws IOException { } } + /** + * test split storefile with favorite node information + */ + private void verifyHFileFavoriteNode(Path p, AsyncTableRegionLocator regionLocator, FileSystem fs) + throws IOException { + Configuration conf = util.getConfiguration(); + + try (HFile.Reader reader = HFile.createReader(fs, p, new CacheConfig(conf), true, conf);) { + + final byte[] firstRowkey = reader.getFirstRowKey().get(); + final HRegionLocation hRegionLocation = + FutureUtils.get(regionLocator.getRegionLocation(firstRowkey)); + + final String targetHostName = hRegionLocation.getHostname(); + + if (fs instanceof DistributedFileSystem) { + DistributedFileSystem dfs = (DistributedFileSystem) fs; + String pathStr = p.toUri().getPath(); + LocatedBlocks blocks = dfs.getClient().getLocatedBlocks(pathStr, 0L); + + boolean isFavoriteNode = false; + List locatedBlocks = blocks.getLocatedBlocks(); + int index = 0; + do { + if (index > 0) { + assertTrue("failed use favored nodes", isFavoriteNode); + } + isFavoriteNode = false; + final LocatedBlock block = locatedBlocks.get(index); + + final DatanodeInfo[] locations = block.getLocations(); + for (DatanodeInfo location : locations) { + + final String hostName = location.getHostName(); + if ( + targetHostName.equals(hostName.equals("127.0.0.1") + ? InetAddress.getLocalHost().getHostAddress() + : "127.0.0.1") + ) { + isFavoriteNode = true; + break; + } + } + + index++; + } while (index < locatedBlocks.size()); + if (index > 0) { + assertTrue("failed use favored nodes", isFavoriteNode); + } + + } + + } + } + private void addStartEndKeysForTest(TreeMap map, byte[] first, byte[] last) { Integer value = map.containsKey(first) ? map.get(first) : 0; map.put(first, value + 1); From 17301f10b307891711100be92358d64a6e586ae6 Mon Sep 17 00:00:00 2001 From: alanzhao Date: Mon, 20 Mar 2023 14:59:27 +0800 Subject: [PATCH 2/9] HBASE-27733 hfile split occurs during bulkload, the new HFile file does not specify favored nodes --- .../hadoop/hbase/tool/BulkLoadHFilesTool.java | 112 ------------------ .../hadoop/hbase/tool/TestBulkLoadHFiles.java | 22 ++-- 2 files changed, 15 insertions(+), 119 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFilesTool.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFilesTool.java index 2a674702ea0a..279da2b2ffa6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFilesTool.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFilesTool.java @@ -93,7 +93,6 @@ import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.security.token.FsDelegationToken; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FSVisitor; import org.apache.hadoop.hbase.util.FutureUtils; import org.apache.hadoop.hbase.util.Pair; @@ -633,49 +632,6 @@ private List splitStoreFile(AsyncTableRegionLocator loc, LoadQueu return lqis; } - private List splitStoreFile(LoadQueueItem item, TableDescriptor tableDesc, - byte[] splitKey) throws IOException { - Path hfilePath = item.getFilePath(); - byte[] family = item.getFamily(); - Path tmpDir = hfilePath.getParent(); - if (!tmpDir.getName().equals(TMP_DIR)) { - tmpDir = new Path(tmpDir, TMP_DIR); - } - - LOG.info("HFile at " + hfilePath + " no longer fits inside a single " + "region. Splitting..."); - - String uniqueName = getUniqueName(); - ColumnFamilyDescriptor familyDesc = tableDesc.getColumnFamily(family); - - Path botOut = new Path(tmpDir, uniqueName + ".bottom"); - Path topOut = new Path(tmpDir, uniqueName + ".top"); - splitStoreFile(getConf(), hfilePath, familyDesc, splitKey, botOut, topOut); - - FileSystem fs = tmpDir.getFileSystem(getConf()); - fs.setPermission(tmpDir, FsPermission.valueOf("-rwxrwxrwx")); - fs.setPermission(botOut, FsPermission.valueOf("-rwxrwxrwx")); - fs.setPermission(topOut, FsPermission.valueOf("-rwxrwxrwx")); - - // Add these back at the *front* of the queue, so there's a lower - // chance that the region will just split again before we get there. - List lqis = new ArrayList<>(2); - lqis.add(new LoadQueueItem(family, botOut)); - lqis.add(new LoadQueueItem(family, topOut)); - - // If the current item is already the result of previous splits, - // we don't need it anymore. Clean up to save space. - // It is not part of the original input files. - try { - if (tmpDir.getName().equals(TMP_DIR)) { - fs.delete(hfilePath, false); - } - } catch (IOException e) { - LOG.warn("Unable to delete temporary split file " + hfilePath); - } - LOG.info("Successfully split into new HFiles " + botOut + " and " + topOut); - return lqis; - } - /** * @param startEndKeys the start/end keys of regions belong to this table, the list in ascending * order by start key @@ -784,21 +740,6 @@ CacheConfig.DISABLED, true, getConf())) { return null; } - /** - * Split a storefile into a top and bottom half, maintaining the metadata, recreating bloom - * filters, etc - */ - @InterfaceAudience.Private - static void splitStoreFile(Configuration conf, Path inFile, ColumnFamilyDescriptor familyDesc, - byte[] splitKey, Path bottomOut, Path topOut) throws IOException { - // Open reader with no block cache, and not in-memory - Reference topReference = Reference.createTopReference(splitKey); - Reference bottomReference = Reference.createBottomReference(splitKey); - - copyHFileHalf(conf, inFile, topOut, topReference, familyDesc); - copyHFileHalf(conf, inFile, bottomOut, bottomReference, familyDesc); - } - /** * Split a storefile into a top and bottom half with favored nodes, maintaining the metadata, * recreating bloom filters, etc. @@ -815,59 +756,6 @@ static void splitStoreFile(AsyncTableRegionLocator loc, Configuration conf, Path copyHFileHalf(conf, inFile, bottomOut, bottomReference, familyDesc, loc); } - /** - * Copy half of an HFile into a new HFile. - */ - private static void copyHFileHalf(Configuration conf, Path inFile, Path outFile, - Reference reference, ColumnFamilyDescriptor familyDescriptor) throws IOException { - FileSystem fs = inFile.getFileSystem(conf); - CacheConfig cacheConf = CacheConfig.DISABLED; - HalfStoreFileReader halfReader = null; - StoreFileWriter halfWriter = null; - try { - ReaderContext context = new ReaderContextBuilder().withFileSystemAndPath(fs, inFile).build(); - StoreFileInfo storeFileInfo = - new StoreFileInfo(conf, fs, fs.getFileStatus(inFile), reference); - storeFileInfo.initHFileInfo(context); - halfReader = (HalfStoreFileReader) storeFileInfo.createReader(context, cacheConf); - storeFileInfo.getHFileInfo().initMetaAndIndex(halfReader.getHFileReader()); - Map fileInfo = halfReader.loadFileInfo(); - - int blocksize = familyDescriptor.getBlocksize(); - Algorithm compression = familyDescriptor.getCompressionType(); - BloomType bloomFilterType = familyDescriptor.getBloomFilterType(); - HFileContext hFileContext = new HFileContextBuilder().withCompression(compression) - .withChecksumType(StoreUtils.getChecksumType(conf)) - .withBytesPerCheckSum(StoreUtils.getBytesPerChecksum(conf)).withBlockSize(blocksize) - .withDataBlockEncoding(familyDescriptor.getDataBlockEncoding()).withIncludesTags(true) - .withCreateTime(EnvironmentEdgeManager.currentTime()).build(); - halfWriter = new StoreFileWriter.Builder(conf, cacheConf, fs).withFilePath(outFile) - .withBloomType(bloomFilterType).withFileContext(hFileContext).build(); - HFileScanner scanner = halfReader.getScanner(false, false, false); - scanner.seekTo(); - do { - halfWriter.append(scanner.getCell()); - } while (scanner.next()); - - for (Map.Entry entry : fileInfo.entrySet()) { - if (shouldCopyHFileMetaKey(entry.getKey())) { - halfWriter.appendFileInfo(entry.getKey(), entry.getValue()); - } - } - } finally { - if (halfReader != null) { - try { - halfReader.close(cacheConf.shouldEvictOnClose()); - } catch (IOException e) { - LOG.warn("failed to close hfile reader for " + inFile, e); - } - } - if (halfWriter != null) { - halfWriter.close(); - } - } - } - /** * Copy half of an HFile into a new HFile with favored nodes. */ diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestBulkLoadHFiles.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestBulkLoadHFiles.java index 910162a2ac12..5c98ceeee04d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestBulkLoadHFiles.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestBulkLoadHFiles.java @@ -563,14 +563,17 @@ public void testSplitStoreFile() throws IOException { FileSystem fs = util.getTestFileSystem(); Path testIn = new Path(dir, "testhfile"); ColumnFamilyDescriptor familyDesc = ColumnFamilyDescriptorBuilder.of(FAMILY); + String tableName = tn.getMethodName(); + util.createTable(TableName.valueOf(tableName), familyDesc.getNameAsString()); HFileTestUtil.createHFile(util.getConfiguration(), fs, testIn, FAMILY, QUALIFIER, Bytes.toBytes("aaa"), Bytes.toBytes("zzz"), 1000); Path bottomOut = new Path(dir, "bottom.out"); Path topOut = new Path(dir, "top.out"); - BulkLoadHFilesTool.splitStoreFile(util.getConfiguration(), testIn, familyDesc, - Bytes.toBytes("ggg"), bottomOut, topOut); + BulkLoadHFilesTool.splitStoreFile( + util.getAsyncConnection().getRegionLocator(TableName.valueOf(tableName)), + util.getConfiguration(), testIn, familyDesc, Bytes.toBytes("ggg"), bottomOut, topOut); int rowCount = verifyHFile(bottomOut); rowCount += verifyHFile(topOut); @@ -586,7 +589,6 @@ public void testSplitStoreFileWithFavoriteNodes() throws IOException { Path dir = new Path(util.getDefaultRootDirPath(), "testhfile"); FileSystem fs = util.getDFSCluster().getFileSystem(); - // FileSystem fs = util.getTestFileSystem(); Path testIn = new Path(dir, "testSplitStoreFileWithFavoriteNodes"); ColumnFamilyDescriptor familyDesc = ColumnFamilyDescriptorBuilder.of(FAMILY); String tableName = tn.getMethodName(); @@ -614,14 +616,17 @@ public void testSplitStoreFileWithCreateTimeTS() throws IOException { FileSystem fs = util.getTestFileSystem(); Path testIn = new Path(dir, "testhfile"); ColumnFamilyDescriptor familyDesc = ColumnFamilyDescriptorBuilder.of(FAMILY); + String tableName = tn.getMethodName(); + util.createTable(TableName.valueOf(tableName), familyDesc.getNameAsString()); HFileTestUtil.createHFile(util.getConfiguration(), fs, testIn, FAMILY, QUALIFIER, Bytes.toBytes("aaa"), Bytes.toBytes("zzz"), 1000); Path bottomOut = new Path(dir, "bottom.out"); Path topOut = new Path(dir, "top.out"); - BulkLoadHFilesTool.splitStoreFile(util.getConfiguration(), testIn, familyDesc, - Bytes.toBytes("ggg"), bottomOut, topOut); + BulkLoadHFilesTool.splitStoreFile( + util.getAsyncConnection().getRegionLocator(TableName.valueOf(tableName)), + util.getConfiguration(), testIn, familyDesc, Bytes.toBytes("ggg"), bottomOut, topOut); verifyHFileCreateTimeTS(bottomOut); verifyHFileCreateTimeTS(topOut); @@ -654,14 +659,17 @@ private void testSplitStoreFileWithDifferentEncoding(DataBlockEncoding bulkloadE Path testIn = new Path(dir, "testhfile"); ColumnFamilyDescriptor familyDesc = ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).setDataBlockEncoding(cfEncoding).build(); + String tableName = tn.getMethodName(); + util.createTable(TableName.valueOf(tableName), familyDesc.getNameAsString()); HFileTestUtil.createHFileWithDataBlockEncoding(util.getConfiguration(), fs, testIn, bulkloadEncoding, FAMILY, QUALIFIER, Bytes.toBytes("aaa"), Bytes.toBytes("zzz"), 1000); Path bottomOut = new Path(dir, "bottom.out"); Path topOut = new Path(dir, "top.out"); - BulkLoadHFilesTool.splitStoreFile(util.getConfiguration(), testIn, familyDesc, - Bytes.toBytes("ggg"), bottomOut, topOut); + BulkLoadHFilesTool.splitStoreFile( + util.getAsyncConnection().getRegionLocator(TableName.valueOf(tableName)), + util.getConfiguration(), testIn, familyDesc, Bytes.toBytes("ggg"), bottomOut, topOut); int rowCount = verifyHFile(bottomOut); rowCount += verifyHFile(topOut); From 3789acccb186ee46ee1531ee5b6185205709b2e1 Mon Sep 17 00:00:00 2001 From: alanzhao Date: Tue, 21 Mar 2023 10:24:26 +0800 Subject: [PATCH 3/9] HBASE-27733 hfile split occurs during bulkload, the new HFile file does not specify favored nodes --- .../hadoop/hbase/tool/BulkLoadHFilesTool.java | 22 +++++++++---------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFilesTool.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFilesTool.java index 279da2b2ffa6..49dbbf0dbefa 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFilesTool.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFilesTool.java @@ -93,6 +93,7 @@ import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.security.token.FsDelegationToken; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FSVisitor; import org.apache.hadoop.hbase.util.FutureUtils; import org.apache.hadoop.hbase.util.Pair; @@ -782,7 +783,7 @@ private static void copyHFileHalf(Configuration conf, Path inFile, Path outFile, .withChecksumType(StoreUtils.getChecksumType(conf)) .withBytesPerCheckSum(StoreUtils.getBytesPerChecksum(conf)).withBlockSize(blocksize) .withDataBlockEncoding(familyDescriptor.getDataBlockEncoding()).withIncludesTags(true) - .build(); + .withCreateTime(EnvironmentEdgeManager.currentTime()).build(); HFileScanner scanner = halfReader.getScanner(false, false, false); scanner.seekTo(); @@ -798,27 +799,26 @@ private static void copyHFileHalf(Configuration conf, Path inFile, Path outFile, HRegionLocation hRegionLocation = FutureUtils.get(loc.getRegionLocation(rowKey)); InetSocketAddress[] favoredNodes = null; if (null == hRegionLocation) { - LOG.trace("Failed get of location, use default writer {}", Bytes.toString(rowKey)); + LOG.warn("Failed get of location, use default writer {}", Bytes.toString(rowKey)); + halfWriter = new StoreFileWriter.Builder(conf, cacheConf, fs).withFilePath(outFile) + .withBloomType(bloomFilterType).withFileContext(hFileContext).build(); } else { LOG.debug("First rowkey: [{}]", Bytes.toString(rowKey)); InetSocketAddress initialIsa = new InetSocketAddress(hRegionLocation.getHostname(), hRegionLocation.getPort()); if (initialIsa.isUnresolved()) { - LOG.trace("Failed resolve address {}, use default writer", + LOG.warn("Failed resolve address {}, use default writer", hRegionLocation.getHostnamePort()); + halfWriter = new StoreFileWriter.Builder(conf, cacheConf, fs).withFilePath(outFile) + .withBloomType(bloomFilterType).withFileContext(hFileContext).build(); } else { LOG.debug("Use favored nodes writer: {}", initialIsa.getHostString()); favoredNodes = new InetSocketAddress[] { initialIsa }; + halfWriter = new StoreFileWriter.Builder(conf, cacheConf, fs).withFilePath(outFile) + .withBloomType(bloomFilterType).withFileContext(hFileContext) + .withFavoredNodes(favoredNodes).build(); } } - if (null == favoredNodes) { - halfWriter = new StoreFileWriter.Builder(conf, cacheConf, fs).withFilePath(outFile) - .withBloomType(bloomFilterType).withFileContext(hFileContext).build(); - } else { - halfWriter = new StoreFileWriter.Builder(conf, cacheConf, fs).withFilePath(outFile) - .withBloomType(bloomFilterType).withFileContext(hFileContext) - .withFavoredNodes(favoredNodes).build(); - } } else { halfWriter = new StoreFileWriter.Builder(conf, cacheConf, fs).withFilePath(outFile) .withBloomType(bloomFilterType).withFileContext(hFileContext).build(); From 3fd6dd2a3d53916ec12eae0086717c3d5417d318 Mon Sep 17 00:00:00 2001 From: alanzhao Date: Tue, 21 Mar 2023 14:45:53 +0800 Subject: [PATCH 4/9] HBASE-27733 hfile split occurs during bulkload, the new HFile file does not specify favored nodes --- .../java/org/apache/hadoop/hbase/tool/TestBulkLoadHFiles.java | 1 + 1 file changed, 1 insertion(+) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestBulkLoadHFiles.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestBulkLoadHFiles.java index 5c98ceeee04d..60c640571c11 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestBulkLoadHFiles.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestBulkLoadHFiles.java @@ -735,6 +735,7 @@ private void verifyHFileFavoriteNode(Path p, AsyncTableRegionLocator regionLocat for (DatanodeInfo location : locations) { final String hostName = location.getHostName(); + fail(" failed to use favored nodes, location.getHostName():" + location.getHostName()); if ( targetHostName.equals(hostName.equals("127.0.0.1") ? InetAddress.getLocalHost().getHostAddress() From d5d23bd8308d2e34d1a3ba174f7e7406b3fa723d Mon Sep 17 00:00:00 2001 From: alanzhao Date: Tue, 21 Mar 2023 19:04:16 +0800 Subject: [PATCH 5/9] HBASE-27733 hfile split occurs during bulkload, the new HFile file does not specify favored nodes --- .../org/apache/hadoop/hbase/tool/TestBulkLoadHFiles.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestBulkLoadHFiles.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestBulkLoadHFiles.java index 60c640571c11..4be73bfa13e7 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestBulkLoadHFiles.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestBulkLoadHFiles.java @@ -735,11 +735,10 @@ private void verifyHFileFavoriteNode(Path p, AsyncTableRegionLocator regionLocat for (DatanodeInfo location : locations) { final String hostName = location.getHostName(); - fail(" failed to use favored nodes, location.getHostName():" + location.getHostName()); if ( targetHostName.equals(hostName.equals("127.0.0.1") - ? InetAddress.getLocalHost().getHostAddress() - : "127.0.0.1") + ? InetAddress.getLocalHost().getHostName() + : "127.0.0.1") || targetHostName.equals(hostName) ) { isFavoriteNode = true; break; From d4368113d4c8cba4a073d5134fb80aa8ee88d200 Mon Sep 17 00:00:00 2001 From: alanzhao Date: Tue, 21 Mar 2023 23:34:40 +0800 Subject: [PATCH 6/9] HBASE-27733 hfile split occurs during bulkload, the new HFile file does not specify favored nodes --- .../java/org/apache/hadoop/hbase/tool/TestBulkLoadHFiles.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestBulkLoadHFiles.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestBulkLoadHFiles.java index 4be73bfa13e7..c6cbb6458c53 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestBulkLoadHFiles.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestBulkLoadHFiles.java @@ -717,9 +717,9 @@ private void verifyHFileFavoriteNode(Path p, AsyncTableRegionLocator regionLocat final String targetHostName = hRegionLocation.getHostname(); if (fs instanceof DistributedFileSystem) { - DistributedFileSystem dfs = (DistributedFileSystem) fs; String pathStr = p.toUri().getPath(); - LocatedBlocks blocks = dfs.getClient().getLocatedBlocks(pathStr, 0L); + LocatedBlocks blocks = + ((DistributedFileSystem) fs).getClient().getLocatedBlocks(pathStr, 0L); boolean isFavoriteNode = false; List locatedBlocks = blocks.getLocatedBlocks(); From 5d0156ce7c84934ca579b26c6588c2fa9909abc2 Mon Sep 17 00:00:00 2001 From: alanzhao Date: Thu, 23 Mar 2023 10:01:58 +0800 Subject: [PATCH 7/9] HBASE-27733 hfile split occurs during bulkload, the new HFile file does not specify favored nodes --- .../org/apache/hadoop/hbase/tool/BulkLoadHFilesTool.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFilesTool.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFilesTool.java index 49dbbf0dbefa..ec68ecab872b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFilesTool.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFilesTool.java @@ -799,7 +799,8 @@ private static void copyHFileHalf(Configuration conf, Path inFile, Path outFile, HRegionLocation hRegionLocation = FutureUtils.get(loc.getRegionLocation(rowKey)); InetSocketAddress[] favoredNodes = null; if (null == hRegionLocation) { - LOG.warn("Failed get of location, use default writer {}", Bytes.toString(rowKey)); + LOG.warn("Failed get location for region {} , Using writer without favoured nodes.", + hRegionLocation); halfWriter = new StoreFileWriter.Builder(conf, cacheConf, fs).withFilePath(outFile) .withBloomType(bloomFilterType).withFileContext(hFileContext).build(); } else { @@ -807,8 +808,8 @@ private static void copyHFileHalf(Configuration conf, Path inFile, Path outFile, InetSocketAddress initialIsa = new InetSocketAddress(hRegionLocation.getHostname(), hRegionLocation.getPort()); if (initialIsa.isUnresolved()) { - LOG.warn("Failed resolve address {}, use default writer", - hRegionLocation.getHostnamePort()); + LOG.warn("Failed get location for region {} , Using writer without favoured nodes.", + hRegionLocation); halfWriter = new StoreFileWriter.Builder(conf, cacheConf, fs).withFilePath(outFile) .withBloomType(bloomFilterType).withFileContext(hFileContext).build(); } else { From dd59627667088524e0d459b483be2db89a169cba Mon Sep 17 00:00:00 2001 From: alanzhao Date: Thu, 23 Mar 2023 13:33:39 +0800 Subject: [PATCH 8/9] HBASE-27733 hfile split occurs during bulkload, the new HFile file does not specify favored nodes --- .../java/org/apache/hadoop/hbase/tool/BulkLoadHFilesTool.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFilesTool.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFilesTool.java index ec68ecab872b..d54483756c6e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFilesTool.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFilesTool.java @@ -799,8 +799,8 @@ private static void copyHFileHalf(Configuration conf, Path inFile, Path outFile, HRegionLocation hRegionLocation = FutureUtils.get(loc.getRegionLocation(rowKey)); InetSocketAddress[] favoredNodes = null; if (null == hRegionLocation) { - LOG.warn("Failed get location for region {} , Using writer without favoured nodes.", - hRegionLocation); + LOG.warn("Failed get region location for rowkey {} , Using writer without favoured nodes.", + Bytes.toString(rowKey)); halfWriter = new StoreFileWriter.Builder(conf, cacheConf, fs).withFilePath(outFile) .withBloomType(bloomFilterType).withFileContext(hFileContext).build(); } else { From c69454f4b2fe19517df1846f49d2d2fd0cfccc98 Mon Sep 17 00:00:00 2001 From: alanzhao Date: Thu, 23 Mar 2023 14:38:15 +0800 Subject: [PATCH 9/9] HBASE-27733 hfile split occurs during bulkload, the new HFile file does not specify favored nodes --- .../java/org/apache/hadoop/hbase/tool/BulkLoadHFilesTool.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFilesTool.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFilesTool.java index d54483756c6e..9b4e1aea9066 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFilesTool.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFilesTool.java @@ -799,7 +799,8 @@ private static void copyHFileHalf(Configuration conf, Path inFile, Path outFile, HRegionLocation hRegionLocation = FutureUtils.get(loc.getRegionLocation(rowKey)); InetSocketAddress[] favoredNodes = null; if (null == hRegionLocation) { - LOG.warn("Failed get region location for rowkey {} , Using writer without favoured nodes.", + LOG.warn( + "Failed get region location for rowkey {} , Using writer without favoured nodes.", Bytes.toString(rowKey)); halfWriter = new StoreFileWriter.Builder(conf, cacheConf, fs).withFilePath(outFile) .withBloomType(bloomFilterType).withFileContext(hFileContext).build();