From f366b1d74c4e46d845e45ba0b0bc7d0361f1d4d0 Mon Sep 17 00:00:00 2001 From: Evie Boland Date: Sun, 29 Dec 2024 02:40:53 -0500 Subject: [PATCH 1/9] Implement tableName translations --- .../org/apache/hadoop/hbase/HConstants.java | 3 + .../ReplicationSinkTrackerTableCreator.java | 2 +- .../regionserver/HFileReplicator.java | 93 ++++++----- .../IdentityReplicationSinkTranslator.java | 50 ++++++ .../regionserver/ReplicationSink.java | 146 ++++++++++-------- .../ReplicationSinkTranslator.java | 36 +++++ 6 files changed, 223 insertions(+), 107 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/IdentityReplicationSinkTranslator.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkTranslator.java diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index c882e71e877a..2eb28b56f294 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -968,6 +968,9 @@ public enum OperationStatusCode { public static final String REPLICATION_SINK_SERVICE_CLASSNAME_DEFAULT = "org.apache.hadoop.hbase.replication.ReplicationSinkServiceImpl"; public static final String REPLICATION_BULKLOAD_ENABLE_KEY = "hbase.replication.bulkload.enabled"; + public static final String REPLICATION_SINK_TRANSLATOR = "hbase.replication.sink.translator"; + public static final String REPLICATION_SINK_TRANSLATOR_DEFAULT = + "org.apache.hadoop.hbase.replication.regionserver.DefaultReplicationSinkTranslator"; public static final boolean REPLICATION_BULKLOAD_ENABLE_DEFAULT = false; /** Replication cluster id of source cluster which uniquely identifies itself with peer cluster */ public static final String REPLICATION_CLUSTER_ID = "hbase.replication.cluster.id"; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationSinkTrackerTableCreator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationSinkTrackerTableCreator.java index 38cf33090d9b..7df00b546c33 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationSinkTrackerTableCreator.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationSinkTrackerTableCreator.java @@ -80,7 +80,7 @@ private ReplicationSinkTrackerTableCreator() { /* * We will create this table only if hbase.regionserver.replication.sink.tracker.enabled is - * enabled and table doesn't exists already. + * enabled and table doesn't exist already. */ public static void createIfNeededAndNotExists(Configuration conf, MasterServices masterServices) throws IOException { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HFileReplicator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HFileReplicator.java index 2d0b4e32ced0..326f9cbf47a3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HFileReplicator.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HFileReplicator.java @@ -94,11 +94,12 @@ public class HFileReplicator implements Closeable { private int maxCopyThreads; private int copiesPerThread; private List sourceClusterIds; + private ReplicationSinkTranslator translator; public HFileReplicator(Configuration sourceClusterConf, String sourceBaseNamespaceDirPath, String sourceHFileArchiveDirPath, Map>>> tableQueueMap, - Configuration conf, AsyncClusterConnection connection, List sourceClusterIds) - throws IOException { + Configuration conf, AsyncClusterConnection connection, List sourceClusterIds, + ReplicationSinkTranslator translator) throws IOException { this.sourceClusterConf = sourceClusterConf; this.sourceBaseNamespaceDirPath = sourceBaseNamespaceDirPath; this.sourceHFileArchiveDirPath = sourceHFileArchiveDirPath; @@ -106,6 +107,7 @@ public HFileReplicator(Configuration sourceClusterConf, String sourceBaseNamespa this.conf = conf; this.connection = connection; this.sourceClusterIds = sourceClusterIds; + this.translator = translator; userProvider = UserProvider.instantiate(conf); fsDelegationToken = new FsDelegationToken(userProvider, "renewer"); @@ -131,29 +133,30 @@ public void close() throws IOException { public Void replicate() throws IOException { // Copy all the hfiles to the local file system - Map tableStagingDirsMap = copyHFilesToStagingDir(); + Map tableToSinkStagingDir = copySourceHFilesToSinkStagingDir(); int maxRetries = conf.getInt(HConstants.BULKLOAD_MAX_RETRIES_NUMBER, 10); - for (Entry tableStagingDir : tableStagingDirsMap.entrySet()) { - String tableNameString = tableStagingDir.getKey(); - Path stagingDir = tableStagingDir.getValue(); - TableName tableName = TableName.valueOf(tableNameString); + for (Entry tableStagingDir : tableToSinkStagingDir.entrySet()) { + String tableNameStr = tableStagingDir.getKey(); + TableName tableName = TableName.valueOf(tableNameStr); + TableName sinkTableName = translator.getSinkTableName(tableName); + Path sinkStagingDir = tableStagingDir.getValue(); // Prepare collection of queue of hfiles to be loaded(replicated) Deque queue = new LinkedList<>(); - BulkLoadHFilesTool.prepareHFileQueue(conf, connection, tableName, stagingDir, queue, false, - false); + BulkLoadHFilesTool.prepareHFileQueue(conf, connection, sinkTableName, sinkStagingDir, queue, + false, false); if (queue.isEmpty()) { - LOG.warn("Did not find any files to replicate in directory {}", stagingDir.toUri()); + LOG.warn("Did not find any files to replicate in directory {}", sinkStagingDir.toUri()); return null; } fsDelegationToken.acquireDelegationToken(sinkFs); try { - doBulkLoad(conf, tableName, stagingDir, queue, maxRetries); + doBulkLoad(conf, sinkTableName, sinkStagingDir, queue, maxRetries); } finally { - cleanup(stagingDir); + cleanup(sinkStagingDir); } } return null; @@ -194,12 +197,12 @@ private void cleanup(Path stagingDir) { // Do not close the file system } - private Map copyHFilesToStagingDir() throws IOException { + private Map copySourceHFilesToSinkStagingDir() throws IOException { Map mapOfCopiedHFiles = new HashMap<>(); Pair> familyHFilePathsPair; List hfilePaths; byte[] family; - Path familyStagingDir; + Path sinkFamilyStagingDir; int familyHFilePathsPairsListSize; int totalNoOfHFiles; List>> familyHFilePathsPairsList; @@ -224,32 +227,33 @@ private Map copyHFilesToStagingDir() throws IOException { // For each table name in the map for (Entry>>> tableEntry : bulkLoadHFileMap .entrySet()) { - String tableName = tableEntry.getKey(); + String tableNameStr = tableEntry.getKey(); + TableName tableName = TableName.valueOf(tableNameStr); // Create staging directory for each table - Path stagingDir = createStagingDir(hbaseStagingDir, user, TableName.valueOf(tableName)); + Path sinkStagingDir = createSinkStagingDir(hbaseStagingDir, user, tableName); familyHFilePathsPairsList = tableEntry.getValue(); familyHFilePathsPairsListSize = familyHFilePathsPairsList.size(); - // For each list of family hfile paths pair in the table + // For each (family, hfile paths) pair in the table for (int i = 0; i < familyHFilePathsPairsListSize; i++) { familyHFilePathsPair = familyHFilePathsPairsList.get(i); family = familyHFilePathsPair.getFirst(); hfilePaths = familyHFilePathsPair.getSecond(); - familyStagingDir = new Path(stagingDir, Bytes.toString(family)); + sinkFamilyStagingDir = getSinkFamilyStagingDir(sinkStagingDir, tableName, family); totalNoOfHFiles = hfilePaths.size(); - // For each list of hfile paths for the family + // For each hfile path in the family List> futures = new ArrayList<>(); Callable c; Future future; int currentCopied = 0; - // Copy the hfiles parallely + // Copy the hfiles in parallel while (totalNoOfHFiles > currentCopied + this.copiesPerThread) { - c = new Copier(sourceFs, familyStagingDir, + c = new Copier(sourceFs, sinkFamilyStagingDir, hfilePaths.subList(currentCopied, currentCopied + this.copiesPerThread)); future = exec.submit(c); futures.add(future); @@ -258,7 +262,7 @@ private Map copyHFilesToStagingDir() throws IOException { int remaining = totalNoOfHFiles - currentCopied; if (remaining > 0) { - c = new Copier(sourceFs, familyStagingDir, + c = new Copier(sourceFs, sinkFamilyStagingDir, hfilePaths.subList(currentCopied, currentCopied + remaining)); future = exec.submit(c); futures.add(future); @@ -281,7 +285,7 @@ private Map copyHFilesToStagingDir() throws IOException { } // Add the staging directory to this table. Staging directory contains all the hfiles // belonging to this table - mapOfCopiedHFiles.put(tableName, stagingDir); + mapOfCopiedHFiles.put(tableNameStr, sinkStagingDir); } return mapOfCopiedHFiles; } finally { @@ -294,12 +298,14 @@ private Map copyHFilesToStagingDir() throws IOException { } } - private Path createStagingDir(Path baseDir, User user, TableName tableName) throws IOException { - String tblName = tableName.getNameAsString().replace(":", UNDERSCORE); + private Path createSinkStagingDir(Path baseDir, User user, TableName tableName) + throws IOException { + TableName sinkTableName = translator.getSinkTableName(tableName); + String sinkTableNameStr = sinkTableName.getNameAsString().replace(":", UNDERSCORE); int RANDOM_WIDTH = 320; int RANDOM_RADIX = 32; String doubleUnderScore = UNDERSCORE + UNDERSCORE; - String randomDir = user.getShortName() + doubleUnderScore + tblName + doubleUnderScore + String randomDir = user.getShortName() + doubleUnderScore + sinkTableNameStr + doubleUnderScore + (new BigInteger(RANDOM_WIDTH, ThreadLocalRandom.current()).toString(RANDOM_RADIX)); return createStagingDir(baseDir, user, randomDir); } @@ -311,50 +317,55 @@ private Path createStagingDir(Path baseDir, User user, String randomDir) throws return p; } + private Path getSinkFamilyStagingDir(Path baseDir, TableName tableName, byte[] family) { + byte[] sinkFamily = translator.getSinkFamily(tableName, family); + return new Path(baseDir, Bytes.toString(sinkFamily)); + } + /** * This class will copy the given hfiles from the given source file system to the given local file * system staging directory. */ private class Copier implements Callable { private FileSystem sourceFs; - private Path stagingDir; - private List hfiles; + private Path sinkStagingDir; + private List hfilePaths; - public Copier(FileSystem sourceFs, final Path stagingDir, final List hfiles) + public Copier(FileSystem sourceFs, final Path sinkStagingDir, final List hfilePaths) throws IOException { this.sourceFs = sourceFs; - this.stagingDir = stagingDir; - this.hfiles = hfiles; + this.sinkStagingDir = sinkStagingDir; + this.hfilePaths = hfilePaths; } @Override public Void call() throws IOException { Path sourceHFilePath; - Path localHFilePath; - int totalHFiles = hfiles.size(); + Path sinkHFilePath; + int totalHFiles = hfilePaths.size(); for (int i = 0; i < totalHFiles; i++) { - sourceHFilePath = new Path(sourceBaseNamespaceDirPath, hfiles.get(i)); - localHFilePath = new Path(stagingDir, sourceHFilePath.getName()); + sourceHFilePath = new Path(sourceBaseNamespaceDirPath, hfilePaths.get(i)); + sinkHFilePath = new Path(sinkStagingDir, sourceHFilePath.getName()); try { - FileUtil.copy(sourceFs, sourceHFilePath, sinkFs, localHFilePath, false, conf); + FileUtil.copy(sourceFs, sourceHFilePath, sinkFs, sinkHFilePath, false, conf); // If any other exception other than FNFE then we will fail the replication requests and // source will retry to replicate these data. } catch (FileNotFoundException e) { - LOG.info("Failed to copy hfile from " + sourceHFilePath + " to " + localHFilePath + LOG.info("Failed to copy hfile from " + sourceHFilePath + " to " + sinkHFilePath + ". Trying to copy from hfile archive directory.", e); - sourceHFilePath = new Path(sourceHFileArchiveDirPath, hfiles.get(i)); + sourceHFilePath = new Path(sourceHFileArchiveDirPath, hfilePaths.get(i)); try { - FileUtil.copy(sourceFs, sourceHFilePath, sinkFs, localHFilePath, false, conf); + FileUtil.copy(sourceFs, sourceHFilePath, sinkFs, sinkHFilePath, false, conf); } catch (FileNotFoundException e1) { // This will mean that the hfile does not exists any where in source cluster FS. So we // cannot do anything here just log and continue. - LOG.debug("Failed to copy hfile from " + sourceHFilePath + " to " + localHFilePath + LOG.debug("Failed to copy hfile from " + sourceHFilePath + " to " + sinkHFilePath + ". Hence ignoring this hfile from replication..", e1); continue; } } - sinkFs.setPermission(localHFilePath, PERM_ALL_ACCESS); + sinkFs.setPermission(sinkHFilePath, PERM_ALL_ACCESS); } return null; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/IdentityReplicationSinkTranslator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/IdentityReplicationSinkTranslator.java new file mode 100644 index 000000000000..cf1496eddf36 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/IdentityReplicationSinkTranslator.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication.regionserver; + +import org.apache.hadoop.hbase.ExtendedCell; +import org.apache.hadoop.hbase.TableName; +import org.apache.yetus.audience.InterfaceAudience; + +@InterfaceAudience.Public +public class IdentityReplicationSinkTranslator implements ReplicationSinkTranslator { + @Override + public TableName getSinkTableName(TableName tableName) { + return tableName; + } + + @Override + public byte[] getSinkRowKey(TableName tableName, byte[] rowKey) { + return rowKey; + } + + @Override + public byte[] getSinkFamily(TableName tableName, byte[] family) { + return family; + } + + @Override + public byte[] getSinkQualifier(TableName tableName, byte[] family, byte[] qualifier) { + return qualifier; + } + + @Override + public ExtendedCell getSinkExtendedCell(TableName tableName, ExtendedCell cell) { + return cell; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java index 508ace390565..81689a457133 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java @@ -72,6 +72,7 @@ import org.apache.hbase.thirdparty.com.google.common.collect.Lists; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry; import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; @@ -181,6 +182,19 @@ private void decorateConf() { } } + private ReplicationSinkTranslator getReplicationSinkTranslator() throws IOException { + Class translatorClass = this.conf.getClass(HConstants.REPLICATION_SINK_TRANSLATOR, null); + ReplicationSinkTranslator translator = null; + try { + translator = translatorClass == null + ? new IdentityReplicationSinkTranslator() + : (ReplicationSinkTranslator) translatorClass.getDeclaredConstructor().newInstance(); + } catch (Exception e) { + LOG.warn("Failed to instantiate " + translatorClass); + } + return translator; + } + /** * Replicate this array of entries directly into the local cluster using the native client. Only * operates against raw protobuf type saving on a conversion from pb to pojo. @@ -202,18 +216,22 @@ public void replicateEntries(List entries, final ExtendedCellScanner c // Very simple optimization where we batch sequences of rows going // to the same table. try { + ReplicationSinkTranslator translator = getReplicationSinkTranslator(); long totalReplicated = 0; - // Map of table => list of Rows, grouped by cluster id, we only want to flushCommits once per - // invocation of this method per table and cluster id. - Map, List>> rowMap = new TreeMap<>(); + Map, Map>>>> bulkLoadsPerClusters = + new HashMap<>(); - Map, Map>>>> bulkLoadsPerClusters = null; - Pair, List> mutationsToWalEntriesPairs = + // Map of tableName => list of Rows, grouped by source cluster id. + // In each call to this method, we only want to flushCommits once per table per source + // clusterq + Map, List>> sinkRowMap = new TreeMap<>(); + Pair, List> sinkMutationsToWalEntriesPairs = new Pair<>(new ArrayList<>(), new ArrayList<>()); + for (WALEntry entry : entries) { - TableName table = TableName.valueOf(entry.getKey().getTableName().toByteArray()); + TableName tableName = TableName.valueOf(entry.getKey().getTableName().toByteArray()); if (this.walEntrySinkFilter != null) { - if (this.walEntrySinkFilter.filter(table, entry.getKey().getWriteTime())) { + if (this.walEntrySinkFilter.filter(tableName, entry.getKey().getWriteTime())) { // Skip Cells in CellScanner associated with this entry. int count = entry.getAssociatedCellCount(); for (int i = 0; i < count; i++) { @@ -226,8 +244,8 @@ public void replicateEntries(List entries, final ExtendedCellScanner c continue; } } - ExtendedCell previousCell = null; - Mutation mutation = null; + ExtendedCell sinkCellPrev = null; + Mutation sinkMutation = null; int count = entry.getAssociatedCellCount(); for (int i = 0; i < count; i++) { // Throw index out of bounds if our cell count is off @@ -236,81 +254,71 @@ public void replicateEntries(List entries, final ExtendedCellScanner c throw new ArrayIndexOutOfBoundsException("Expected=" + count + ", index=" + i); } ExtendedCell cell = cells.current(); - // Handle bulk load hfiles replication if (CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) { + // Bulk load events BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell); if (bld.getReplicate()) { - if (bulkLoadsPerClusters == null) { - bulkLoadsPerClusters = new HashMap<>(); - } - // Map of table name Vs list of pair of family and list of - // hfile paths from its namespace + // Map of tableNameStr to (family, hfile paths) pairs Map>>> bulkLoadHFileMap = bulkLoadsPerClusters.computeIfAbsent(bld.getClusterIdsList(), k -> new HashMap<>()); - buildBulkLoadHFileMap(bulkLoadHFileMap, table, bld); + buildBulkLoadHFileMap(bulkLoadHFileMap, bld); } } else if (CellUtil.matchingQualifier(cell, WALEdit.REPLICATION_MARKER)) { Mutation put = processReplicationMarkerEntry(cell); if (put == null) { continue; } - table = REPLICATION_SINK_TRACKER_TABLE_NAME; - List clusterIds = new ArrayList<>(); - for (HBaseProtos.UUID clusterId : entry.getKey().getClusterIdsList()) { - clusterIds.add(toUUID(clusterId)); - } + List clusterIds = getSourceClusterIds(entry); put.setClusterIds(clusterIds); - addToHashMultiMap(rowMap, table, clusterIds, put); + addToHashMultiMap(sinkRowMap, REPLICATION_SINK_TRACKER_TABLE_NAME, clusterIds, put); } else { - // Handle wal replication - if (isNewRowOrType(previousCell, cell)) { - // Create new mutation - mutation = CellUtil.isDelete(cell) - ? new Delete(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()) - : new Put(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()); - List clusterIds = new ArrayList<>(entry.getKey().getClusterIdsList().size()); - for (HBaseProtos.UUID clusterId : entry.getKey().getClusterIdsList()) { - clusterIds.add(toUUID(clusterId)); - } - mutation.setClusterIds(clusterIds); - mutation.setAttribute(ReplicationUtils.REPLICATION_ATTR_NAME, + TableName sinkTableName = translator.getSinkTableName(tableName); + ExtendedCell sinkCell = translator.getSinkExtendedCell(tableName, cell); + if (isNewRowOrType(sinkCellPrev, sinkCell)) { + sinkMutation = CellUtil.isDelete(sinkCell) + ? new Delete(sinkCell.getRowArray(), sinkCell.getRowOffset(), + sinkCell.getRowLength()) + : new Put(sinkCell.getRowArray(), sinkCell.getRowOffset(), sinkCell.getRowLength()); + List clusterIds = getSourceClusterIds(entry); + sinkMutation.setClusterIds(clusterIds); + sinkMutation.setAttribute(ReplicationUtils.REPLICATION_ATTR_NAME, HConstants.EMPTY_BYTE_ARRAY); if (rsServerHost != null) { - rsServerHost.preReplicationSinkBatchMutate(entry, mutation); - mutationsToWalEntriesPairs.getFirst().add(mutation); - mutationsToWalEntriesPairs.getSecond().add(entry); + rsServerHost.preReplicationSinkBatchMutate(entry, sinkMutation); + sinkMutationsToWalEntriesPairs.getFirst().add(sinkMutation); + sinkMutationsToWalEntriesPairs.getSecond().add(entry); } - addToHashMultiMap(rowMap, table, clusterIds, mutation); + addToHashMultiMap(sinkRowMap, sinkTableName, clusterIds, sinkMutation); } - if (CellUtil.isDelete(cell)) { - ((Delete) mutation).add(cell); + if (CellUtil.isDelete(sinkCell)) { + ((Delete) sinkMutation).add(sinkCell); } else { - ((Put) mutation).add(cell); + ((Put) sinkMutation).add(sinkCell); } - previousCell = cell; + sinkCellPrev = sinkCell; } } totalReplicated++; } // TODO Replicating mutations and bulk loaded data can be made parallel - if (!rowMap.isEmpty()) { + if (!sinkRowMap.isEmpty()) { LOG.debug("Started replicating mutations."); - for (Entry, List>> entry : rowMap.entrySet()) { + for (Entry, List>> entry : sinkRowMap.entrySet()) { batch(entry.getKey(), entry.getValue().values(), rowSizeWarnThreshold); } LOG.debug("Finished replicating mutations."); - } - if (rsServerHost != null) { - List mutations = mutationsToWalEntriesPairs.getFirst(); - List walEntries = mutationsToWalEntriesPairs.getSecond(); - for (int i = 0; i < mutations.size(); i++) { - rsServerHost.postReplicationSinkBatchMutate(walEntries.get(i), mutations.get(i)); + if (rsServerHost != null) { + List sinkMutations = sinkMutationsToWalEntriesPairs.getFirst(); + List walEntries = sinkMutationsToWalEntriesPairs.getSecond(); + for (int i = 0; i < sinkMutations.size(); i++) { + rsServerHost.postReplicationSinkBatchMutate(walEntries.get(i), sinkMutations.get(i)); + } } } - if (bulkLoadsPerClusters != null) { + if (!bulkLoadsPerClusters.isEmpty()) { for (Entry, Map>>>> entry : bulkLoadsPerClusters.entrySet()) { Map>>> bulkLoadHFileMap = entry.getValue(); @@ -319,7 +327,7 @@ public void replicateEntries(List entries, final ExtendedCellScanner c Configuration providerConf = this.provider.getConf(this.conf, replicationClusterId); try (HFileReplicator hFileReplicator = new HFileReplicator(providerConf, sourceBaseNamespaceDirPath, sourceHFileArchiveDirPath, bulkLoadHFileMap, conf, - getConnection(), entry.getKey())) { + getConnection(), entry.getKey(), translator)) { hFileReplicator.replicate(); LOG.debug("Finished replicating {} bulk loaded data", entry.getKey().toString()); } @@ -338,6 +346,14 @@ public void replicateEntries(List entries, final ExtendedCellScanner c } } + private List getSourceClusterIds(WALEntry entry) { + List clusterIds = new ArrayList<>(entry.getKey().getClusterIdsList().size()); + for (HBaseProtos.UUID clusterId : entry.getKey().getClusterIdsList()) { + clusterIds.add(toUUID(clusterId)); + } + return clusterIds; + } + /* * First check if config key hbase.regionserver.replication.sink.tracker.enabled is true or not. * If false, then ignore this cell. If set to true, de-serialize value into @@ -366,11 +382,10 @@ private Put processReplicationMarkerEntry(Cell cell) throws IOException { } private void buildBulkLoadHFileMap( - final Map>>> bulkLoadHFileMap, TableName table, - BulkLoadDescriptor bld) throws IOException { + final Map>>> bulkLoadHFileMap, BulkLoadDescriptor bld) + throws IOException { List storesList = bld.getStoresList(); - int storesSize = storesList.size(); - for (int j = 0; j < storesSize; j++) { + for (int j = 0; j < storesList.size(); j++) { StoreDescriptor storeDescriptor = storesList.get(j); List storeFileList = storeDescriptor.getStoreFileList(); int storeFilesSize = storeFileList.size(); @@ -378,10 +393,11 @@ private void buildBulkLoadHFileMap( for (int k = 0; k < storeFilesSize; k++) { byte[] family = storeDescriptor.getFamilyName().toByteArray(); - // Build hfile relative path from its namespace - String pathToHfileFromNS = getHFilePath(table, bld, storeFileList.get(k), family); - String tableName = table.getNameWithNamespaceInclAsString(); - List>> familyHFilePathsList = bulkLoadHFileMap.get(tableName); + // Build relative hfile path starting with its namespace dir + TableName tableName = ProtobufUtil.toTableName(bld.getTableName()); + String pathToHfileFromNS = getHFilePath(tableName, bld, storeFileList.get(k), family); + String tableNameStr = tableName.getNameWithNamespaceInclAsString(); + List>> familyHFilePathsList = bulkLoadHFileMap.get(tableNameStr); if (familyHFilePathsList != null) { boolean foundFamily = false; for (Pair> familyHFilePathsPair : familyHFilePathsList) { @@ -393,12 +409,12 @@ private void buildBulkLoadHFileMap( } } if (!foundFamily) { - // Family not found, add this family and its hfile paths pair to the list + // Family not found, add this (family, hfile paths) pair to the list addFamilyAndItsHFilePathToTableInMap(family, pathToHfileFromNS, familyHFilePathsList); } } else { // Add this table entry into the map - addNewTableEntryInMap(bulkLoadHFileMap, family, pathToHfileFromNS, tableName); + addNewTableEntryInMap(bulkLoadHFileMap, family, pathToHfileFromNS, tableNameStr); } } } @@ -422,10 +438,10 @@ private void addNewTableEntryInMap( bulkLoadHFileMap.put(tableName, newFamilyHFilePathsList); } - private String getHFilePath(TableName table, BulkLoadDescriptor bld, String storeFile, + private String getHFilePath(TableName tableName, BulkLoadDescriptor bld, String storeFile, byte[] family) { - return new StringBuilder(100).append(table.getNamespaceAsString()).append(Path.SEPARATOR) - .append(table.getQualifierAsString()).append(Path.SEPARATOR) + return new StringBuilder(100).append(tableName.getNamespaceAsString()).append(Path.SEPARATOR) + .append(tableName.getQualifierAsString()).append(Path.SEPARATOR) .append(Bytes.toString(bld.getEncodedRegionName().toByteArray())).append(Path.SEPARATOR) .append(Bytes.toString(family)).append(Path.SEPARATOR).append(storeFile).toString(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkTranslator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkTranslator.java new file mode 100644 index 000000000000..5b0168d51b81 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkTranslator.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication.regionserver; + +import org.apache.hadoop.hbase.ExtendedCell; +import org.apache.hadoop.hbase.TableName; +import org.apache.yetus.audience.InterfaceAudience; + +@InterfaceAudience.Public +public interface ReplicationSinkTranslator { + + public TableName getSinkTableName(TableName tableName); + + public byte[] getSinkRowKey(TableName tableName, byte[] rowKey); + + public byte[] getSinkFamily(TableName tableName, byte[] family); + + public byte[] getSinkQualifier(TableName tableName, byte[] family, byte[] qualifier); + + public ExtendedCell getSinkExtendedCell(TableName tableName, ExtendedCell cell); +} From 8f3bc83e32fe0933050d8c2bfbcc618fa32b3e9f Mon Sep 17 00:00:00 2001 From: Evie Boland Date: Mon, 6 Jan 2025 11:51:23 -0500 Subject: [PATCH 2/9] Do not swallow RuntimeExcemptions --- .../regionserver/ReplicationSink.java | 23 +++++++++++-------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java index 81689a457133..d48851d614d2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java @@ -25,7 +25,6 @@ import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.RS_COLUMN; import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.TIMESTAMP_COLUMN; import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.WAL_NAME_COLUMN; - import java.io.ByteArrayInputStream; import java.io.IOException; import java.util.ArrayList; @@ -69,9 +68,7 @@ import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import org.apache.hbase.thirdparty.com.google.common.collect.Lists; - import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry; import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; @@ -140,6 +137,8 @@ public ReplicationSink(Configuration conf, RegionServerCoprocessorHost rsServerH Class c = Class.forName(className).asSubclass(SourceFSConfigurationProvider.class); this.provider = c.getDeclaredConstructor().newInstance(); + } catch (RuntimeException e) { + throw e; } catch (Exception e) { throw new IllegalArgumentException( "Configured source fs configuration provider class " + className + " throws error.", e); @@ -151,9 +150,11 @@ private WALEntrySinkFilter setupWALEntrySinkFilter() throws IOException { this.conf.getClass(WALEntrySinkFilter.WAL_ENTRY_FILTER_KEY, null); WALEntrySinkFilter filter = null; try { - filter = walEntryFilterClass == null - ? null - : (WALEntrySinkFilter) walEntryFilterClass.getDeclaredConstructor().newInstance(); + filter = walEntryFilterClass == null ? + null : + (WALEntrySinkFilter) walEntryFilterClass.getDeclaredConstructor().newInstance(); + } catch (RuntimeException e) { + throw e; } catch (Exception e) { LOG.warn("Failed to instantiate " + walEntryFilterClass); } @@ -183,12 +184,14 @@ private void decorateConf() { } private ReplicationSinkTranslator getReplicationSinkTranslator() throws IOException { - Class translatorClass = this.conf.getClass(HConstants.REPLICATION_SINK_TRANSLATOR, null); + Class translatorClass = this.conf.getClass(HConstants.REPLICATION_SINK_TRANSLATOR, IdentityReplicationSinkTranslator.class); ReplicationSinkTranslator translator = null; try { - translator = translatorClass == null - ? new IdentityReplicationSinkTranslator() - : (ReplicationSinkTranslator) translatorClass.getDeclaredConstructor().newInstance(); + translator = translatorClass == null ? + null : + (ReplicationSinkTranslator) translatorClass.getDeclaredConstructor().newInstance(); + } catch (RuntimeException e) { + throw e; } catch (Exception e) { LOG.warn("Failed to instantiate " + translatorClass); } From 5cd3c20bae689566ea76132a4841af64c5ee7149 Mon Sep 17 00:00:00 2001 From: Evie Boland Date: Mon, 6 Jan 2025 12:51:39 -0500 Subject: [PATCH 3/9] Spotless:apply --- .../regionserver/ReplicationSink.java | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java index d48851d614d2..3c3a5bd1bbab 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java @@ -25,6 +25,7 @@ import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.RS_COLUMN; import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.TIMESTAMP_COLUMN; import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.WAL_NAME_COLUMN; + import java.io.ByteArrayInputStream; import java.io.IOException; import java.util.ArrayList; @@ -68,7 +69,9 @@ import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import org.apache.hbase.thirdparty.com.google.common.collect.Lists; + import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry; import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; @@ -150,9 +153,9 @@ private WALEntrySinkFilter setupWALEntrySinkFilter() throws IOException { this.conf.getClass(WALEntrySinkFilter.WAL_ENTRY_FILTER_KEY, null); WALEntrySinkFilter filter = null; try { - filter = walEntryFilterClass == null ? - null : - (WALEntrySinkFilter) walEntryFilterClass.getDeclaredConstructor().newInstance(); + filter = walEntryFilterClass == null + ? null + : (WALEntrySinkFilter) walEntryFilterClass.getDeclaredConstructor().newInstance(); } catch (RuntimeException e) { throw e; } catch (Exception e) { @@ -184,12 +187,13 @@ private void decorateConf() { } private ReplicationSinkTranslator getReplicationSinkTranslator() throws IOException { - Class translatorClass = this.conf.getClass(HConstants.REPLICATION_SINK_TRANSLATOR, IdentityReplicationSinkTranslator.class); + Class translatorClass = this.conf.getClass(HConstants.REPLICATION_SINK_TRANSLATOR, + IdentityReplicationSinkTranslator.class); ReplicationSinkTranslator translator = null; try { - translator = translatorClass == null ? - null : - (ReplicationSinkTranslator) translatorClass.getDeclaredConstructor().newInstance(); + translator = translatorClass == null + ? null + : (ReplicationSinkTranslator) translatorClass.getDeclaredConstructor().newInstance(); } catch (RuntimeException e) { throw e; } catch (Exception e) { From 60ebfdca389b364c18e0bf968881ea40cd1d39cd Mon Sep 17 00:00:00 2001 From: Evie Boland Date: Mon, 6 Jan 2025 13:02:29 -0500 Subject: [PATCH 4/9] Fix copy/paste bug --- .../src/main/java/org/apache/hadoop/hbase/HConstants.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index 2eb28b56f294..878a1a25a967 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hbase; import static org.apache.hadoop.hbase.io.hfile.BlockType.MAGIC_LENGTH; - import java.nio.ByteBuffer; import java.nio.charset.Charset; import java.util.Arrays; @@ -970,7 +969,7 @@ public enum OperationStatusCode { public static final String REPLICATION_BULKLOAD_ENABLE_KEY = "hbase.replication.bulkload.enabled"; public static final String REPLICATION_SINK_TRANSLATOR = "hbase.replication.sink.translator"; public static final String REPLICATION_SINK_TRANSLATOR_DEFAULT = - "org.apache.hadoop.hbase.replication.regionserver.DefaultReplicationSinkTranslator"; + "org.apache.hadoop.hbase.replication.regionserver.IdentityReplicationSinkTranslator"; public static final boolean REPLICATION_BULKLOAD_ENABLE_DEFAULT = false; /** Replication cluster id of source cluster which uniquely identifies itself with peer cluster */ public static final String REPLICATION_CLUSTER_ID = "hbase.replication.cluster.id"; From 0448f31e4577dbdfbbc10cd910b9a3016067be31 Mon Sep 17 00:00:00 2001 From: Evie Boland Date: Wed, 8 Jan 2025 14:35:48 -0500 Subject: [PATCH 5/9] Add tests for translations --- .../org/apache/hadoop/hbase/HConstants.java | 1 + .../regionserver/TestReplicationSink.java | 296 ++++++++++++++---- 2 files changed, 232 insertions(+), 65 deletions(-) diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index 878a1a25a967..602e7a36d4c1 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase; import static org.apache.hadoop.hbase.io.hfile.BlockType.MAGIC_LENGTH; + import java.nio.ByteBuffer; import java.nio.charset.Charset; import java.util.Arrays; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java index bdb51ebe36f8..1d95cafcd932 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hbase.replication.regionserver; import static org.junit.Assert.assertEquals; - import java.io.IOException; import java.util.ArrayList; import java.util.Collections; @@ -35,7 +34,10 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.CellBuilderType; import org.apache.hadoop.hbase.ExtendedCell; +import org.apache.hadoop.hbase.ExtendedCellBuilder; +import org.apache.hadoop.hbase.ExtendedCellBuilderFactory; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtil; import org.apache.hadoop.hbase.HConstants; @@ -72,9 +74,7 @@ import org.junit.experimental.categories.Category; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; - import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry; import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.UUID; @@ -97,11 +97,14 @@ public class TestReplicationSink { protected static final TableName TABLE_NAME1 = TableName.valueOf("table1"); protected static final TableName TABLE_NAME2 = TableName.valueOf("table2"); + protected static final TableName TABLE_NAME3 = TableName.valueOf("table3"); + protected static final TableName TABLE_NAME4 = TableName.valueOf("table4"); + protected static final TableName TABLE_NAME5 = TableName.valueOf("table5"); + protected static final TableName TABLE_NAME6 = TableName.valueOf("table6"); protected static final byte[] FAM_NAME1 = Bytes.toBytes("info1"); protected static final byte[] FAM_NAME2 = Bytes.toBytes("info2"); - protected static Table table1; protected static Stoppable STOPPABLE = new Stoppable() { final AtomicBoolean stop = new AtomicBoolean(false); @@ -118,7 +121,12 @@ public void stop(String why) { }; + protected static Table table1; protected static Table table2; + protected static Table table3; + protected static Table table4; + protected static Table table5; + protected static Table table6; protected static String baseNamespaceDir; protected static String hfileArchiveDir; protected static String replicationClusterId; @@ -130,12 +138,18 @@ public void stop(String why) { public static void setUpBeforeClass() throws Exception { TEST_UTIL.getConfiguration().set("hbase.replication.source.fs.conf.provider", TestSourceFSConfigurationProvider.class.getCanonicalName()); + TEST_UTIL.getConfiguration().setClass(HConstants.REPLICATION_SINK_TRANSLATOR, + TestReplicationSinkTranslator.class, ReplicationSinkTranslator.class); TEST_UTIL.startMiniCluster(3); RegionServerCoprocessorHost rsCpHost = TEST_UTIL.getMiniHBaseCluster().getRegionServer(0).getRegionServerCoprocessorHost(); SINK = new ReplicationSink(new Configuration(TEST_UTIL.getConfiguration()), rsCpHost); table1 = TEST_UTIL.createTable(TABLE_NAME1, FAM_NAME1); table2 = TEST_UTIL.createTable(TABLE_NAME2, FAM_NAME2); + table3 = TEST_UTIL.createTable(TABLE_NAME3, FAM_NAME1); + table4 = TEST_UTIL.createTable(TABLE_NAME4, FAM_NAME2); + table5 = TEST_UTIL.createTable(TABLE_NAME5, FAM_NAME1); + table6 = TEST_UTIL.createTable(TABLE_NAME6, FAM_NAME1); Path rootDir = CommonFSUtils.getRootDir(TEST_UTIL.getConfiguration()); baseNamespaceDir = new Path(rootDir, new Path(HConstants.BASE_NAMESPACE_DIR)).toString(); hfileArchiveDir = new Path(rootDir, new Path(HConstants.HFILE_ARCHIVE_DIRECTORY)).toString(); @@ -158,34 +172,46 @@ public static void tearDownAfterClass() throws Exception { public void setUp() throws Exception { table1 = TEST_UTIL.deleteTableData(TABLE_NAME1); table2 = TEST_UTIL.deleteTableData(TABLE_NAME2); + table3 = TEST_UTIL.deleteTableData(TABLE_NAME3); + table4 = TEST_UTIL.deleteTableData(TABLE_NAME4); + table5 = TEST_UTIL.deleteTableData(TABLE_NAME5); + table6 = TEST_UTIL.deleteTableData(TABLE_NAME6); } /** * Insert a whole batch of entries */ - @Test - public void testBatchSink() throws Exception { + public void testBatchSink(TableName tableName, Table sinkTable) throws Exception { List entries = new ArrayList<>(BATCH_SIZE); List cells = new ArrayList<>(); for (int i = 0; i < BATCH_SIZE; i++) { - entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells)); + entries.add(createEntry(tableName, i, KeyValue.Type.Put, cells)); } SINK.replicateEntries(entries, PrivateCellUtil.createExtendedCellScanner(cells.iterator()), replicationClusterId, baseNamespaceDir, hfileArchiveDir); Scan scan = new Scan(); - ResultScanner scanRes = table1.getScanner(scan); + ResultScanner scanRes = sinkTable.getScanner(scan); assertEquals(BATCH_SIZE, scanRes.next(BATCH_SIZE).length); } + @Test + public void testBatchSink() throws Exception { + testBatchSink(TABLE_NAME1, table1); + } + + @Test + public void testBatchSinkWithTranslation() throws Exception { + testBatchSink(TABLE_NAME3, table4); + } + /** * Insert a mix of puts and deletes */ - @Test - public void testMixedPutDelete() throws Exception { + public void testMixedPutDelete(TableName tableName, Table sinkTable) throws Exception { List entries = new ArrayList<>(BATCH_SIZE / 2); List cells = new ArrayList<>(); for (int i = 0; i < BATCH_SIZE / 2; i++) { - entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells)); + entries.add(createEntry(tableName, i, KeyValue.Type.Put, cells)); } SINK.replicateEntries(entries, PrivateCellUtil.createExtendedCellScanner(cells), replicationClusterId, baseNamespaceDir, hfileArchiveDir); @@ -193,28 +219,37 @@ public void testMixedPutDelete() throws Exception { entries = new ArrayList<>(BATCH_SIZE); cells = new ArrayList<>(); for (int i = 0; i < BATCH_SIZE; i++) { - entries.add(createEntry(TABLE_NAME1, i, + entries.add(createEntry(tableName, i, i % 2 != 0 ? KeyValue.Type.Put : KeyValue.Type.DeleteColumn, cells)); } SINK.replicateEntries(entries, PrivateCellUtil.createExtendedCellScanner(cells.iterator()), replicationClusterId, baseNamespaceDir, hfileArchiveDir); Scan scan = new Scan(); - ResultScanner scanRes = table1.getScanner(scan); + ResultScanner scanRes = sinkTable.getScanner(scan); assertEquals(BATCH_SIZE / 2, scanRes.next(BATCH_SIZE).length); } @Test - public void testLargeEditsPutDelete() throws Exception { + public void testMixedPutDelete() throws Exception { + testMixedPutDelete(TABLE_NAME1, table1); + } + + @Test + public void testMixedPutDeleteWithTranslation() throws Exception { + testMixedPutDelete(TABLE_NAME3, table4); + } + + public void testLargeEditsPutDelete(TableName tableName, Table sinkTable) throws Exception { List entries = new ArrayList<>(); List cells = new ArrayList<>(); for (int i = 0; i < 5510; i++) { - entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells)); + entries.add(createEntry(tableName, i, KeyValue.Type.Put, cells)); } SINK.replicateEntries(entries, PrivateCellUtil.createExtendedCellScanner(cells), replicationClusterId, baseNamespaceDir, hfileArchiveDir); - ResultScanner resultScanner = table1.getScanner(new Scan()); + ResultScanner resultScanner = sinkTable.getScanner(new Scan()); int totalRows = 0; while (resultScanner.next() != null) { totalRows++; @@ -224,12 +259,12 @@ public void testLargeEditsPutDelete() throws Exception { entries = new ArrayList<>(); cells = new ArrayList<>(); for (int i = 0; i < 11000; i++) { - entries.add(createEntry(TABLE_NAME1, i, + entries.add(createEntry(tableName, i, i % 2 != 0 ? KeyValue.Type.Put : KeyValue.Type.DeleteColumn, cells)); } SINK.replicateEntries(entries, PrivateCellUtil.createExtendedCellScanner(cells), replicationClusterId, baseNamespaceDir, hfileArchiveDir); - resultScanner = table1.getScanner(new Scan()); + resultScanner = sinkTable.getScanner(new Scan()); totalRows = 0; while (resultScanner.next() != null) { totalRows++; @@ -237,80 +272,120 @@ public void testLargeEditsPutDelete() throws Exception { assertEquals(5500, totalRows); } + @Test + public void testLargeEditsPutDelete() throws Exception { + testLargeEditsPutDelete(TABLE_NAME1, table1); + } + + @Test + public void testLargeEditsPutDeleteWithTranslation() throws Exception { + testLargeEditsPutDelete(TABLE_NAME3, table4); + } + /** * Insert to 2 different tables */ - @Test - public void testMixedPutTables() throws Exception { + public void testMixedPutTables(TableName tableName1, Table sinkTable1, TableName tableName2, + Table sinkTable2) throws Exception { List entries = new ArrayList<>(BATCH_SIZE / 2); List cells = new ArrayList<>(); for (int i = 0; i < BATCH_SIZE; i++) { - entries.add(createEntry(i % 2 == 0 ? TABLE_NAME2 : TABLE_NAME1, i, KeyValue.Type.Put, cells)); + entries.add(createEntry(i % 2 == 0 ? tableName2 : tableName1, i, KeyValue.Type.Put, cells)); } SINK.replicateEntries(entries, PrivateCellUtil.createExtendedCellScanner(cells.iterator()), replicationClusterId, baseNamespaceDir, hfileArchiveDir); Scan scan = new Scan(); - ResultScanner scanRes = table2.getScanner(scan); + ResultScanner scanRes = sinkTable2.getScanner(scan); for (Result res : scanRes) { assertEquals(0, Bytes.toInt(res.getRow()) % 2); } - scanRes = table1.getScanner(scan); + scanRes = sinkTable1.getScanner(scan); for (Result res : scanRes) { assertEquals(1, Bytes.toInt(res.getRow()) % 2); } } + @Test + public void testMixedPutTables() throws Exception { + testMixedPutTables(TABLE_NAME1, table1, TABLE_NAME2, table2); + } + + @Test + public void testMixedPutTablesWithTranslation() throws Exception { + testMixedPutTables(TABLE_NAME3, table4, TABLE_NAME2, table2); + } + /** * Insert then do different types of deletes */ - @Test - public void testMixedDeletes() throws Exception { + public void testMixedDeletes(TableName tableName, Table sinkTable) throws Exception { List entries = new ArrayList<>(3); List cells = new ArrayList<>(); for (int i = 0; i < 3; i++) { - entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells)); + entries.add(createEntry(tableName, i, KeyValue.Type.Put, cells)); } SINK.replicateEntries(entries, PrivateCellUtil.createExtendedCellScanner(cells.iterator()), replicationClusterId, baseNamespaceDir, hfileArchiveDir); entries = new ArrayList<>(3); cells = new ArrayList<>(); - entries.add(createEntry(TABLE_NAME1, 0, KeyValue.Type.DeleteColumn, cells)); - entries.add(createEntry(TABLE_NAME1, 1, KeyValue.Type.DeleteFamily, cells)); - entries.add(createEntry(TABLE_NAME1, 2, KeyValue.Type.DeleteColumn, cells)); + entries.add(createEntry(tableName, 0, KeyValue.Type.DeleteColumn, cells)); + entries.add(createEntry(tableName, 1, KeyValue.Type.DeleteFamily, cells)); + entries.add(createEntry(tableName, 2, KeyValue.Type.DeleteColumn, cells)); SINK.replicateEntries(entries, PrivateCellUtil.createExtendedCellScanner(cells.iterator()), replicationClusterId, baseNamespaceDir, hfileArchiveDir); Scan scan = new Scan(); - ResultScanner scanRes = table1.getScanner(scan); + ResultScanner scanRes = sinkTable.getScanner(scan); assertEquals(0, scanRes.next(3).length); } + @Test + public void testMixedDeletes() throws Exception { + testMixedDeletes(TABLE_NAME1, table1); + } + + @Test + public void testMixedDeletesWithTranslation() throws Exception { + testMixedDeletes(TABLE_NAME3, table4); + } + /** * Puts are buffered, but this tests when a delete (not-buffered) is applied before the actual Put * that creates it. */ - @Test - public void testApplyDeleteBeforePut() throws Exception { + public void testApplyDeleteBeforePut(TableName tableName, Table sinkTable) throws Exception { List entries = new ArrayList<>(5); List cells = new ArrayList<>(); for (int i = 0; i < 2; i++) { - entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells)); + entries.add(createEntry(tableName, i, KeyValue.Type.Put, cells)); } - entries.add(createEntry(TABLE_NAME1, 1, KeyValue.Type.DeleteFamily, cells)); + int deleteRow = 1; + entries.add(createEntry(tableName, deleteRow, KeyValue.Type.DeleteFamily, cells)); for (int i = 3; i < 5; i++) { - entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells)); + entries.add(createEntry(tableName, i, KeyValue.Type.Put, cells)); } SINK.replicateEntries(entries, PrivateCellUtil.createExtendedCellScanner(cells.iterator()), replicationClusterId, baseNamespaceDir, hfileArchiveDir); - Get get = new Get(Bytes.toBytes(1)); - Result res = table1.get(get); + ReplicationSinkTranslator translator = new TestReplicationSinkTranslator(); + Get get = new Get(translator.getSinkRowKey(tableName, Bytes.toBytes(deleteRow))); + Result res = sinkTable.get(get); assertEquals(0, res.size()); } @Test - public void testRethrowRetriesExhaustedException() throws Exception { + public void testApplyDeleteBeforePut() throws Exception { + testApplyDeleteBeforePut(TABLE_NAME1, table1); + } + + @Test + public void testApplyDeleteBeforePutWithTranslation() throws Exception { + testApplyDeleteBeforePut(TABLE_NAME3, table4); + } + + public void testRethrowRetriesExhaustedException(TableName tableName, TableName sinkTableName) + throws Exception { TableName notExistTable = TableName.valueOf("notExistTable"); List entries = new ArrayList<>(); List cells = new ArrayList<>(); @@ -326,11 +401,11 @@ public void testRethrowRetriesExhaustedException() throws Exception { entries.clear(); cells.clear(); for (int i = 0; i < 10; i++) { - entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells)); + entries.add(createEntry(tableName, i, KeyValue.Type.Put, cells)); } try (Connection conn = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration())) { try (Admin admin = conn.getAdmin()) { - admin.disableTable(TABLE_NAME1); + admin.disableTable(sinkTableName); try { SINK.replicateEntries(entries, PrivateCellUtil.createExtendedCellScanner(cells.iterator()), replicationClusterId, @@ -338,17 +413,27 @@ public void testRethrowRetriesExhaustedException() throws Exception { Assert.fail("Should re-throw RetriesExhaustedWithDetailsException."); } catch (RetriesExhaustedException e) { } finally { - admin.enableTable(TABLE_NAME1); + admin.enableTable(sinkTableName); } } } } + @Test + public void testRethrowRetriesExhaustedException() throws Exception { + testRethrowRetriesExhaustedException(TABLE_NAME1, TABLE_NAME1); + } + + @Test + public void testRethrowRetriesExhaustedExceptionWithTranslation() throws Exception { + testRethrowRetriesExhaustedException(TABLE_NAME3, TABLE_NAME4); + } + /** * Test replicateEntries with a bulk load entry for 25 HFiles */ - @Test - public void testReplicateEntriesForHFiles() throws Exception { + public void testReplicateEntriesForHFiles(TableName tableName, byte[] family, Table sinkTable) + throws Exception { Path dir = TEST_UTIL.getDataTestDirOnTestFS("testReplicateEntries"); Path familyDir = new Path(dir, Bytes.toString(FAM_NAME1)); int numRows = 10; @@ -371,7 +456,7 @@ public void testReplicateEntriesForHFiles() throws Exception { Iterator numbersItr = numberList.iterator(); for (int i = 0; i < 25; i++) { Path hfilePath = new Path(familyDir, hfilePrefix + i); - HFileTestUtil.createHFile(conf, fs, hfilePath, FAM_NAME1, FAM_NAME1, + HFileTestUtil.createHFile(conf, fs, hfilePath, family, family, Bytes.toBytes(numbersItr.next()), Bytes.toBytes(numbersItr.next()), numRows); p.add(hfilePath); storeFilesSize.put(hfilePath.getName(), fs.getFileStatus(hfilePath).getLen()); @@ -379,14 +464,14 @@ public void testReplicateEntriesForHFiles() throws Exception { // 3. Create a BulkLoadDescriptor and a WALEdit Map> storeFiles = new HashMap<>(1); - storeFiles.put(FAM_NAME1, p); + storeFiles.put(family, p); org.apache.hadoop.hbase.wal.WALEdit edit = null; WALProtos.BulkLoadDescriptor loadDescriptor = null; try (Connection c = ConnectionFactory.createConnection(conf); - RegionLocator l = c.getRegionLocator(TABLE_NAME1)) { + RegionLocator l = c.getRegionLocator(tableName)) { RegionInfo regionInfo = l.getAllRegionLocations().get(0).getRegion(); - loadDescriptor = ProtobufUtil.toBulkLoadDescriptor(TABLE_NAME1, + loadDescriptor = ProtobufUtil.toBulkLoadDescriptor(tableName, UnsafeByteOperations.unsafeWrap(regionInfo.getEncodedNameAsBytes()), storeFiles, storeFilesSize, 1); edit = org.apache.hadoop.hbase.wal.WALEdit.createBulkLoadEvent(regionInfo, loadDescriptor); @@ -394,12 +479,12 @@ public void testReplicateEntriesForHFiles() throws Exception { List entries = new ArrayList<>(1); // 4. Create a WALEntryBuilder - WALEntry.Builder builder = createWALEntryBuilder(TABLE_NAME1); + WALEntry.Builder builder = createWALEntryBuilder(tableName); // 5. Copy the hfile to the path as it is in reality for (int i = 0; i < 25; i++) { - String pathToHfileFromNS = new StringBuilder(100).append(TABLE_NAME1.getNamespaceAsString()) - .append(Path.SEPARATOR).append(Bytes.toString(TABLE_NAME1.getName())).append(Path.SEPARATOR) + String pathToHfileFromNS = new StringBuilder(100).append(tableName.getNamespaceAsString()) + .append(Path.SEPARATOR).append(Bytes.toString(tableName.getName())).append(Path.SEPARATOR) .append(Bytes.toString(loadDescriptor.getEncodedRegionName().toByteArray())) .append(Path.SEPARATOR).append(Bytes.toString(FAM_NAME1)).append(Path.SEPARATOR) .append(hfilePrefix + i).toString(); @@ -409,7 +494,7 @@ public void testReplicateEntriesForHFiles() throws Exception { } entries.add(builder.build()); - try (ResultScanner scanner = table1.getScanner(new Scan())) { + try (ResultScanner scanner = sinkTable.getScanner(new Scan())) { // 6. Assert no existing data in table assertEquals(0, scanner.next(numRows).length); } @@ -418,24 +503,34 @@ public void testReplicateEntriesForHFiles() throws Exception { PrivateCellUtil .createExtendedCellScanner(WALEditInternalHelper.getExtendedCells(edit).iterator()), replicationClusterId, baseNamespaceDir, hfileArchiveDir); - try (ResultScanner scanner = table1.getScanner(new Scan())) { + try (ResultScanner scanner = sinkTable.getScanner(new Scan())) { // 8. Assert data is replicated assertEquals(numRows, scanner.next(numRows).length); } - // Clean up the created hfiles or it will mess up subsequent tests + // Clean up the created hfiles, or it will mess up subsequent tests + } + + @Test + public void testReplicateEntriesForHFiles() throws Exception { + testReplicateEntriesForHFiles(TABLE_NAME1, FAM_NAME1, table1); + } + + @Test + public void testReplicateEntriesForHFilesWithTranslation() throws Exception { + testReplicateEntriesForHFiles(TABLE_NAME5, FAM_NAME1, table6); } /** * Test failure metrics produced for failed replication edits */ - @Test - public void testFailedReplicationSinkMetrics() throws IOException { + public void testFailedReplicationSinkMetrics(TableName tableName, TableName sinkTableName) + throws IOException { long initialFailedBatches = SINK.getSinkMetrics().getFailedBatches(); long errorCount = 0L; List entries = new ArrayList<>(BATCH_SIZE); List cells = new ArrayList<>(); for (int i = 0; i < BATCH_SIZE; i++) { - entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells)); + entries.add(createEntry(tableName, i, KeyValue.Type.Put, cells)); } cells.clear(); // cause IndexOutOfBoundsException try { @@ -465,12 +560,12 @@ public void testFailedReplicationSinkMetrics() throws IOException { entries.clear(); cells.clear(); for (int i = 0; i < BATCH_SIZE; i++) { - entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells)); + entries.add(createEntry(tableName, i, KeyValue.Type.Put, cells)); } // cause IOException in batch() try (Connection conn = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration())) { try (Admin admin = conn.getAdmin()) { - admin.disableTable(TABLE_NAME1); + admin.disableTable(sinkTableName); try { SINK.replicateEntries(entries, PrivateCellUtil.createExtendedCellScanner(cells.iterator()), replicationClusterId, @@ -480,16 +575,33 @@ public void testFailedReplicationSinkMetrics() throws IOException { errorCount++; assertEquals(initialFailedBatches + errorCount, SINK.getSinkMetrics().getFailedBatches()); } finally { - admin.enableTable(TABLE_NAME1); + admin.enableTable(sinkTableName); } } } } + @Test + public void testFailedReplicationSinkMetrics() throws IOException { + testFailedReplicationSinkMetrics(TABLE_NAME1, TABLE_NAME1); + } + + @Test + public void testFailedReplicationSinkMetricsWithTranslation() throws IOException { + testFailedReplicationSinkMetrics(TABLE_NAME3, TABLE_NAME4); + } + private WALEntry createEntry(TableName table, int row, KeyValue.Type type, List cells) { - byte[] fam = table.equals(TABLE_NAME1) ? FAM_NAME1 : FAM_NAME2; byte[] rowBytes = Bytes.toBytes(row); + byte[] fam = table.equals(TABLE_NAME2) || table.equals(TABLE_NAME4) ? FAM_NAME2 : FAM_NAME1; + byte[] qualifier = type.getCode() == KeyValue.Type.DeleteFamily.getCode() ? null : fam; + byte[] value = type.getCode() == KeyValue.Type.Put.getCode() ? rowBytes : null; + return createEntry(table, rowBytes, fam, qualifier, value, type, cells); + } + + private WALEntry createEntry(TableName table, byte[] row, byte[] family, byte[] qualifier, + byte[] value, KeyValue.Type type, List cells) { // Just make sure we don't get the same ts for two consecutive rows with // same key try { @@ -500,11 +612,11 @@ private WALEntry createEntry(TableName table, int row, KeyValue.Type type, final long now = EnvironmentEdgeManager.currentTime(); KeyValue kv = null; if (type.getCode() == KeyValue.Type.Put.getCode()) { - kv = new KeyValue(rowBytes, fam, fam, now, KeyValue.Type.Put, Bytes.toBytes(row)); + kv = new KeyValue(row, family, qualifier, now, KeyValue.Type.Put, value); } else if (type.getCode() == KeyValue.Type.DeleteColumn.getCode()) { - kv = new KeyValue(rowBytes, fam, fam, now, KeyValue.Type.DeleteColumn); + kv = new KeyValue(row, family, qualifier, now, KeyValue.Type.DeleteColumn); } else if (type.getCode() == KeyValue.Type.DeleteFamily.getCode()) { - kv = new KeyValue(rowBytes, fam, null, now, KeyValue.Type.DeleteFamily); + kv = new KeyValue(row, family, qualifier, now, KeyValue.Type.DeleteFamily); } WALEntry.Builder builder = createWALEntryBuilder(table); cells.add(kv); @@ -512,7 +624,7 @@ private WALEntry createEntry(TableName table, int row, KeyValue.Type type, return builder.build(); } - public static WALEntry.Builder createWALEntryBuilder(TableName table) { + public static WALEntry.Builder createWALEntryBuilder(TableName tableName) { WALEntry.Builder builder = WALEntry.newBuilder(); builder.setAssociatedCellCount(1); WALKey.Builder keyBuilder = WALKey.newBuilder(); @@ -520,11 +632,65 @@ public static WALEntry.Builder createWALEntryBuilder(TableName table) { uuidBuilder.setLeastSigBits(HConstants.DEFAULT_CLUSTER_ID.getLeastSignificantBits()); uuidBuilder.setMostSigBits(HConstants.DEFAULT_CLUSTER_ID.getMostSignificantBits()); keyBuilder.setClusterId(uuidBuilder.build()); - keyBuilder.setTableName(UnsafeByteOperations.unsafeWrap(table.getName())); + keyBuilder.setTableName(UnsafeByteOperations.unsafeWrap(tableName.getName())); keyBuilder.setWriteTime(EnvironmentEdgeManager.currentTime()); keyBuilder.setEncodedRegionName(UnsafeByteOperations.unsafeWrap(HConstants.EMPTY_BYTE_ARRAY)); keyBuilder.setLogSequenceNumber(-1); builder.setKey(keyBuilder.build()); return builder; } + + public static class TestReplicationSinkTranslator implements ReplicationSinkTranslator { + + private static final ExtendedCellBuilder EXTENDED_CELL_BUILDER = + ExtendedCellBuilderFactory.create(CellBuilderType.DEEP_COPY); + + @Override + public TableName getSinkTableName(TableName tableName) { + if (tableName.equals(TABLE_NAME3)) { + return TABLE_NAME4; + } else if (tableName.equals(TABLE_NAME5)) { + return TABLE_NAME6; + } + return tableName; + } + + @Override + public byte[] getSinkRowKey(TableName tableName, byte[] rowKey) { + return rowKey; + } + + @Override + public byte[] getSinkFamily(TableName tableName, byte[] family) { + if (tableName.equals(TABLE_NAME3) && Bytes.equals(family, FAM_NAME1)) { + return FAM_NAME2; + } + return family; + } + + @Override + public byte[] getSinkQualifier(TableName tableName, byte[] family, byte[] qualifier) { + return qualifier; + } + + @Override + public ExtendedCell getSinkExtendedCell(TableName tableName, ExtendedCell cell) { + if ( + tableName.equals(TABLE_NAME3) && Bytes.equals(cell.getFamilyArray(), cell.getFamilyOffset(), + cell.getFamilyLength(), FAM_NAME1, 0, FAM_NAME1.length) + ) { + int tagsLen = cell.getTagsLength(); + LOG.info("tagsLen: {}", tagsLen); + return EXTENDED_CELL_BUILDER.clear() + .setRow(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()).setFamily(FAM_NAME2) + .setQualifier(cell.getQualifierArray(), cell.getQualifierOffset(), + cell.getQualifierLength()) + .setTimestamp(cell.getTimestamp()) + .setTags(cell.getTagsArray(), cell.getTagsOffset(), cell.getTagsLength()) + .setSequenceId(cell.getSequenceId()).setType(cell.getType()) + .setValue(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()).build(); + } + return cell; + } + } } From 33369d13aa744d0377913427ce3bc85333dbd4d3 Mon Sep 17 00:00:00 2001 From: Evie Boland Date: Wed, 8 Jan 2025 14:39:49 -0500 Subject: [PATCH 6/9] Remove debugging code --- .../hbase/replication/regionserver/TestReplicationSink.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java index 1d95cafcd932..d1291ac8a3bd 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java @@ -679,8 +679,6 @@ public ExtendedCell getSinkExtendedCell(TableName tableName, ExtendedCell cell) tableName.equals(TABLE_NAME3) && Bytes.equals(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(), FAM_NAME1, 0, FAM_NAME1.length) ) { - int tagsLen = cell.getTagsLength(); - LOG.info("tagsLen: {}", tagsLen); return EXTENDED_CELL_BUILDER.clear() .setRow(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()).setFamily(FAM_NAME2) .setQualifier(cell.getQualifierArray(), cell.getQualifierOffset(), From 2c7bc92aedd326fcd0ed7e7d8b6514ec03de1a1e Mon Sep 17 00:00:00 2001 From: Evie Boland Date: Wed, 8 Jan 2025 16:02:31 -0500 Subject: [PATCH 7/9] Add tests to ensure translations are getting applied to cell parts --- .../regionserver/TestReplicationSink.java | 164 ++++++++++++------ 1 file changed, 110 insertions(+), 54 deletions(-) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java index d1291ac8a3bd..f43237efd9a2 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java @@ -17,7 +17,9 @@ */ package org.apache.hadoop.hbase.replication.regionserver; +import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; @@ -105,6 +107,8 @@ public class TestReplicationSink { protected static final byte[] FAM_NAME1 = Bytes.toBytes("info1"); protected static final byte[] FAM_NAME2 = Bytes.toBytes("info2"); + protected static final TestReplicationSinkTranslator TRANSLATOR = new TestReplicationSinkTranslator(); + protected static Stoppable STOPPABLE = new Stoppable() { final AtomicBoolean stop = new AtomicBoolean(false); @@ -181,37 +185,44 @@ public void setUp() throws Exception { /** * Insert a whole batch of entries */ - public void testBatchSink(TableName tableName, Table sinkTable) throws Exception { + public void testBatchSink(TableName tableName, byte[] family, Table sinkTable) throws Exception { List entries = new ArrayList<>(BATCH_SIZE); List cells = new ArrayList<>(); + Set rows = new HashSet<>(); for (int i = 0; i < BATCH_SIZE; i++) { - entries.add(createEntry(tableName, i, KeyValue.Type.Put, cells)); + byte[] row = Bytes.toBytes(i); + entries.add(createEntry(tableName, row, family, family, row, KeyValue.Type.Put, cells)); + rows.add(i); } SINK.replicateEntries(entries, PrivateCellUtil.createExtendedCellScanner(cells.iterator()), replicationClusterId, baseNamespaceDir, hfileArchiveDir); Scan scan = new Scan(); ResultScanner scanRes = sinkTable.getScanner(scan); assertEquals(BATCH_SIZE, scanRes.next(BATCH_SIZE).length); + validatePuts(tableName, rows, family, family, sinkTable); } @Test public void testBatchSink() throws Exception { - testBatchSink(TABLE_NAME1, table1); + testBatchSink(TABLE_NAME1, FAM_NAME1, table1); } @Test public void testBatchSinkWithTranslation() throws Exception { - testBatchSink(TABLE_NAME3, table4); + testBatchSink(TABLE_NAME3, FAM_NAME1, table4); } /** * Insert a mix of puts and deletes */ - public void testMixedPutDelete(TableName tableName, Table sinkTable) throws Exception { + public void testMixedPutDelete(TableName tableName, byte[] family, Table sinkTable) throws Exception { List entries = new ArrayList<>(BATCH_SIZE / 2); List cells = new ArrayList<>(); + Set rows = new HashSet<>(); for (int i = 0; i < BATCH_SIZE / 2; i++) { - entries.add(createEntry(tableName, i, KeyValue.Type.Put, cells)); + byte[] row = Bytes.toBytes(i); + entries.add(createEntry(tableName, row, family, family, row, KeyValue.Type.Put, cells)); + rows.add(i); } SINK.replicateEntries(entries, PrivateCellUtil.createExtendedCellScanner(cells), replicationClusterId, baseNamespaceDir, hfileArchiveDir); @@ -219,8 +230,14 @@ public void testMixedPutDelete(TableName tableName, Table sinkTable) throws Exce entries = new ArrayList<>(BATCH_SIZE); cells = new ArrayList<>(); for (int i = 0; i < BATCH_SIZE; i++) { - entries.add(createEntry(tableName, i, - i % 2 != 0 ? KeyValue.Type.Put : KeyValue.Type.DeleteColumn, cells)); + byte[] row = Bytes.toBytes(i); + if (i % 2 != 0) { + entries.add(createEntry(tableName, row, family, family, row, KeyValue.Type.Put, cells)); + rows.add(i); + } else { + entries.add(createEntry(tableName, row, family, family, null, KeyValue.Type.DeleteColumn, cells)); + rows.remove(i); + } } SINK.replicateEntries(entries, PrivateCellUtil.createExtendedCellScanner(cells.iterator()), @@ -228,23 +245,27 @@ public void testMixedPutDelete(TableName tableName, Table sinkTable) throws Exce Scan scan = new Scan(); ResultScanner scanRes = sinkTable.getScanner(scan); assertEquals(BATCH_SIZE / 2, scanRes.next(BATCH_SIZE).length); + validatePuts(tableName, rows, family, family, sinkTable); } @Test public void testMixedPutDelete() throws Exception { - testMixedPutDelete(TABLE_NAME1, table1); + testMixedPutDelete(TABLE_NAME1, FAM_NAME1, table1); } @Test public void testMixedPutDeleteWithTranslation() throws Exception { - testMixedPutDelete(TABLE_NAME3, table4); + testMixedPutDelete(TABLE_NAME3, FAM_NAME1, table4); } - public void testLargeEditsPutDelete(TableName tableName, Table sinkTable) throws Exception { + public void testLargeEditsPutDelete(TableName tableName, byte[] family, Table sinkTable) throws Exception { List entries = new ArrayList<>(); List cells = new ArrayList<>(); + Set rows = new HashSet<>(); for (int i = 0; i < 5510; i++) { - entries.add(createEntry(tableName, i, KeyValue.Type.Put, cells)); + byte[] row = Bytes.toBytes(i); + entries.add(createEntry(tableName, row, family, family, row, KeyValue.Type.Put, cells)); + rows.add(i); } SINK.replicateEntries(entries, PrivateCellUtil.createExtendedCellScanner(cells), replicationClusterId, baseNamespaceDir, hfileArchiveDir); @@ -259,8 +280,14 @@ public void testLargeEditsPutDelete(TableName tableName, Table sinkTable) throws entries = new ArrayList<>(); cells = new ArrayList<>(); for (int i = 0; i < 11000; i++) { - entries.add(createEntry(tableName, i, - i % 2 != 0 ? KeyValue.Type.Put : KeyValue.Type.DeleteColumn, cells)); + byte[] row = Bytes.toBytes(i); + if (i % 2 != 0) { + entries.add(createEntry(tableName, row, family, family, row, KeyValue.Type.Put, cells)); + rows.add(i); + } else { + entries.add(createEntry(tableName, row, family, family, null, KeyValue.Type.DeleteColumn, cells)); + rows.remove(i); + } } SINK.replicateEntries(entries, PrivateCellUtil.createExtendedCellScanner(cells), replicationClusterId, baseNamespaceDir, hfileArchiveDir); @@ -270,27 +297,37 @@ public void testLargeEditsPutDelete(TableName tableName, Table sinkTable) throws totalRows++; } assertEquals(5500, totalRows); + validatePuts(tableName, rows, family, family, sinkTable); } @Test public void testLargeEditsPutDelete() throws Exception { - testLargeEditsPutDelete(TABLE_NAME1, table1); + testLargeEditsPutDelete(TABLE_NAME1, FAM_NAME1, table1); } @Test public void testLargeEditsPutDeleteWithTranslation() throws Exception { - testLargeEditsPutDelete(TABLE_NAME3, table4); + testLargeEditsPutDelete(TABLE_NAME3, FAM_NAME1, table4); } /** * Insert to 2 different tables */ - public void testMixedPutTables(TableName tableName1, Table sinkTable1, TableName tableName2, + public void testMixedPutTables(TableName tableName1, byte[] family1, Table sinkTable1, TableName tableName2, byte[] family2, Table sinkTable2) throws Exception { List entries = new ArrayList<>(BATCH_SIZE / 2); List cells = new ArrayList<>(); + Set rows1 = new HashSet<>(); + Set rows2 = new HashSet<>(); for (int i = 0; i < BATCH_SIZE; i++) { - entries.add(createEntry(i % 2 == 0 ? tableName2 : tableName1, i, KeyValue.Type.Put, cells)); + byte[] row = Bytes.toBytes(i); + if (i % 2 == 0) { + entries.add(createEntry(tableName2, row, family2, family2, row, KeyValue.Type.Put, cells)); + rows2.add(i); + } else { + entries.add(createEntry(tableName1, row, family1, family1, row, KeyValue.Type.Put, cells)); + rows1.add(i); + } } SINK.replicateEntries(entries, PrivateCellUtil.createExtendedCellScanner(cells.iterator()), @@ -308,30 +345,31 @@ public void testMixedPutTables(TableName tableName1, Table sinkTable1, TableName @Test public void testMixedPutTables() throws Exception { - testMixedPutTables(TABLE_NAME1, table1, TABLE_NAME2, table2); + testMixedPutTables(TABLE_NAME1, FAM_NAME1, table1, TABLE_NAME2, FAM_NAME2, table2); } @Test public void testMixedPutTablesWithTranslation() throws Exception { - testMixedPutTables(TABLE_NAME3, table4, TABLE_NAME2, table2); + testMixedPutTables(TABLE_NAME3, FAM_NAME1, table4, TABLE_NAME2, FAM_NAME2, table2); } /** * Insert then do different types of deletes */ - public void testMixedDeletes(TableName tableName, Table sinkTable) throws Exception { + public void testMixedDeletes(TableName tableName, byte[] family, Table sinkTable) throws Exception { List entries = new ArrayList<>(3); List cells = new ArrayList<>(); for (int i = 0; i < 3; i++) { - entries.add(createEntry(tableName, i, KeyValue.Type.Put, cells)); + byte[] row = Bytes.toBytes(i); + entries.add(createEntry(tableName, row, family, family, row, KeyValue.Type.Put, cells)); } SINK.replicateEntries(entries, PrivateCellUtil.createExtendedCellScanner(cells.iterator()), replicationClusterId, baseNamespaceDir, hfileArchiveDir); entries = new ArrayList<>(3); cells = new ArrayList<>(); - entries.add(createEntry(tableName, 0, KeyValue.Type.DeleteColumn, cells)); - entries.add(createEntry(tableName, 1, KeyValue.Type.DeleteFamily, cells)); - entries.add(createEntry(tableName, 2, KeyValue.Type.DeleteColumn, cells)); + entries.add(createEntry(tableName, Bytes.toBytes(0), family, family, null, KeyValue.Type.DeleteColumn, cells)); + entries.add(createEntry(tableName, Bytes.toBytes(1), family, null, null, KeyValue.Type.DeleteFamily, cells)); + entries.add(createEntry(tableName, Bytes.toBytes(2), family, family, null, KeyValue.Type.DeleteColumn, cells)); SINK.replicateEntries(entries, PrivateCellUtil.createExtendedCellScanner(cells.iterator()), replicationClusterId, baseNamespaceDir, hfileArchiveDir); @@ -343,54 +381,62 @@ public void testMixedDeletes(TableName tableName, Table sinkTable) throws Except @Test public void testMixedDeletes() throws Exception { - testMixedDeletes(TABLE_NAME1, table1); + testMixedDeletes(TABLE_NAME1, FAM_NAME1, table1); } @Test public void testMixedDeletesWithTranslation() throws Exception { - testMixedDeletes(TABLE_NAME3, table4); + testMixedDeletes(TABLE_NAME3, FAM_NAME1, table4); } /** * Puts are buffered, but this tests when a delete (not-buffered) is applied before the actual Put * that creates it. */ - public void testApplyDeleteBeforePut(TableName tableName, Table sinkTable) throws Exception { + public void testApplyDeleteBeforePut(TableName tableName, byte[] family, Table sinkTable) throws Exception { List entries = new ArrayList<>(5); List cells = new ArrayList<>(); + Set rows = new HashSet<>(); for (int i = 0; i < 2; i++) { - entries.add(createEntry(tableName, i, KeyValue.Type.Put, cells)); + byte[] row = Bytes.toBytes(i); + entries.add(createEntry(tableName, row, family, family, row, KeyValue.Type.Put, cells)); + rows.add(i); } int deleteRow = 1; - entries.add(createEntry(tableName, deleteRow, KeyValue.Type.DeleteFamily, cells)); + entries.add(createEntry(tableName, Bytes.toBytes(1), family, null, null, KeyValue.Type.DeleteFamily, cells)); + rows.remove(deleteRow); for (int i = 3; i < 5; i++) { - entries.add(createEntry(tableName, i, KeyValue.Type.Put, cells)); + byte[] row = Bytes.toBytes(i); + entries.add(createEntry(tableName, row, family, family, row, KeyValue.Type.Put, cells)); + rows.add(i); } SINK.replicateEntries(entries, PrivateCellUtil.createExtendedCellScanner(cells.iterator()), replicationClusterId, baseNamespaceDir, hfileArchiveDir); - ReplicationSinkTranslator translator = new TestReplicationSinkTranslator(); - Get get = new Get(translator.getSinkRowKey(tableName, Bytes.toBytes(deleteRow))); + Get get = new Get(TRANSLATOR.getSinkRowKey(tableName, Bytes.toBytes(deleteRow))); Result res = sinkTable.get(get); assertEquals(0, res.size()); + validatePuts(tableName, rows, family, family, sinkTable); } @Test public void testApplyDeleteBeforePut() throws Exception { - testApplyDeleteBeforePut(TABLE_NAME1, table1); + testApplyDeleteBeforePut(TABLE_NAME1, FAM_NAME1, table1); } @Test public void testApplyDeleteBeforePutWithTranslation() throws Exception { - testApplyDeleteBeforePut(TABLE_NAME3, table4); + testApplyDeleteBeforePut(TABLE_NAME3, FAM_NAME1, table4); } - public void testRethrowRetriesExhaustedException(TableName tableName, TableName sinkTableName) + public void testRethrowRetriesExhaustedException(TableName tableName, byte[] family, TableName sinkTableName) throws Exception { TableName notExistTable = TableName.valueOf("notExistTable"); + byte[] notExistFamily = Bytes.toBytes("notExistFamily"); List entries = new ArrayList<>(); List cells = new ArrayList<>(); for (int i = 0; i < 10; i++) { - entries.add(createEntry(notExistTable, i, KeyValue.Type.Put, cells)); + byte[] row = Bytes.toBytes(i); + entries.add(createEntry(notExistTable, row, notExistFamily, notExistFamily, row, KeyValue.Type.Put, cells)); } try { SINK.replicateEntries(entries, PrivateCellUtil.createExtendedCellScanner(cells.iterator()), @@ -401,7 +447,8 @@ public void testRethrowRetriesExhaustedException(TableName tableName, TableName entries.clear(); cells.clear(); for (int i = 0; i < 10; i++) { - entries.add(createEntry(tableName, i, KeyValue.Type.Put, cells)); + byte[] row = Bytes.toBytes(i); + entries.add(createEntry(tableName, row, family, family, row, KeyValue.Type.Put, cells)); } try (Connection conn = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration())) { try (Admin admin = conn.getAdmin()) { @@ -421,12 +468,12 @@ public void testRethrowRetriesExhaustedException(TableName tableName, TableName @Test public void testRethrowRetriesExhaustedException() throws Exception { - testRethrowRetriesExhaustedException(TABLE_NAME1, TABLE_NAME1); + testRethrowRetriesExhaustedException(TABLE_NAME1, FAM_NAME1, TABLE_NAME1); } @Test public void testRethrowRetriesExhaustedExceptionWithTranslation() throws Exception { - testRethrowRetriesExhaustedException(TABLE_NAME3, TABLE_NAME4); + testRethrowRetriesExhaustedException(TABLE_NAME3, FAM_NAME1, TABLE_NAME4); } /** @@ -523,14 +570,17 @@ public void testReplicateEntriesForHFilesWithTranslation() throws Exception { /** * Test failure metrics produced for failed replication edits */ - public void testFailedReplicationSinkMetrics(TableName tableName, TableName sinkTableName) + public void testFailedReplicationSinkMetrics(TableName tableName, byte[] family, TableName sinkTableName) throws IOException { long initialFailedBatches = SINK.getSinkMetrics().getFailedBatches(); long errorCount = 0L; List entries = new ArrayList<>(BATCH_SIZE); List cells = new ArrayList<>(); + Set rows = new HashSet<>(); for (int i = 0; i < BATCH_SIZE; i++) { - entries.add(createEntry(tableName, i, KeyValue.Type.Put, cells)); + byte[] row = Bytes.toBytes(i); + entries.add(createEntry(tableName, row, family, family, row, KeyValue.Type.Put, cells)); + rows.add(i); } cells.clear(); // cause IndexOutOfBoundsException try { @@ -545,8 +595,10 @@ public void testFailedReplicationSinkMetrics(TableName tableName, TableName sink entries.clear(); cells.clear(); TableName notExistTable = TableName.valueOf("notExistTable"); // cause TableNotFoundException + byte[] notExistFamily = Bytes.toBytes("notExistFamily"); for (int i = 0; i < BATCH_SIZE; i++) { - entries.add(createEntry(notExistTable, i, KeyValue.Type.Put, cells)); + byte[] row = Bytes.toBytes(i); + entries.add(createEntry(notExistTable, row, notExistFamily, notExistFamily, row, KeyValue.Type.Put, cells)); } try { SINK.replicateEntries(entries, PrivateCellUtil.createExtendedCellScanner(cells.iterator()), @@ -560,7 +612,8 @@ public void testFailedReplicationSinkMetrics(TableName tableName, TableName sink entries.clear(); cells.clear(); for (int i = 0; i < BATCH_SIZE; i++) { - entries.add(createEntry(tableName, i, KeyValue.Type.Put, cells)); + byte[] row = Bytes.toBytes(i); + entries.add(createEntry(tableName, row, family, family, row, KeyValue.Type.Put, cells)); } // cause IOException in batch() try (Connection conn = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration())) { @@ -583,21 +636,24 @@ public void testFailedReplicationSinkMetrics(TableName tableName, TableName sink @Test public void testFailedReplicationSinkMetrics() throws IOException { - testFailedReplicationSinkMetrics(TABLE_NAME1, TABLE_NAME1); + testFailedReplicationSinkMetrics(TABLE_NAME1, FAM_NAME1, TABLE_NAME1); } @Test public void testFailedReplicationSinkMetricsWithTranslation() throws IOException { - testFailedReplicationSinkMetrics(TABLE_NAME3, TABLE_NAME4); + testFailedReplicationSinkMetrics(TABLE_NAME3, FAM_NAME1, TABLE_NAME4); } - private WALEntry createEntry(TableName table, int row, KeyValue.Type type, - List cells) { - byte[] rowBytes = Bytes.toBytes(row); - byte[] fam = table.equals(TABLE_NAME2) || table.equals(TABLE_NAME4) ? FAM_NAME2 : FAM_NAME1; - byte[] qualifier = type.getCode() == KeyValue.Type.DeleteFamily.getCode() ? null : fam; - byte[] value = type.getCode() == KeyValue.Type.Put.getCode() ? rowBytes : null; - return createEntry(table, rowBytes, fam, qualifier, value, type, cells); + private void validatePuts(TableName tableName, Set rows, byte[] family, byte[] qualifier, + Table sinkTable) + throws IOException { + for (int row : rows) { + byte[] rowBytes = Bytes.toBytes(row); + Get get = new Get(TRANSLATOR.getSinkRowKey(tableName, rowBytes)); + Result res = sinkTable.get(get); + assertTrue(!res.isEmpty()); + assertArrayEquals(res.getValue(TRANSLATOR.getSinkFamily(tableName, family), TRANSLATOR.getSinkQualifier(tableName, family, qualifier)), rowBytes); + } } private WALEntry createEntry(TableName table, byte[] row, byte[] family, byte[] qualifier, @@ -616,7 +672,7 @@ private WALEntry createEntry(TableName table, byte[] row, byte[] family, byte[] } else if (type.getCode() == KeyValue.Type.DeleteColumn.getCode()) { kv = new KeyValue(row, family, qualifier, now, KeyValue.Type.DeleteColumn); } else if (type.getCode() == KeyValue.Type.DeleteFamily.getCode()) { - kv = new KeyValue(row, family, qualifier, now, KeyValue.Type.DeleteFamily); + kv = new KeyValue(row, family, null, now, KeyValue.Type.DeleteFamily); } WALEntry.Builder builder = createWALEntryBuilder(table); cells.add(kv); From 57eb69e3bb262a9017c02294206ae8a657ed152b Mon Sep 17 00:00:00 2001 From: Evie Boland Date: Thu, 9 Jan 2025 09:23:37 -0500 Subject: [PATCH 8/9] Spotless:apply --- .../regionserver/TestReplicationSink.java | 60 ++++++++++++------- 1 file changed, 38 insertions(+), 22 deletions(-) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java index f43237efd9a2..476b06f2ed56 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java @@ -20,6 +20,7 @@ import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; + import java.io.IOException; import java.util.ArrayList; import java.util.Collections; @@ -76,7 +77,9 @@ import org.junit.experimental.categories.Category; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; + import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry; import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.UUID; @@ -107,7 +110,8 @@ public class TestReplicationSink { protected static final byte[] FAM_NAME1 = Bytes.toBytes("info1"); protected static final byte[] FAM_NAME2 = Bytes.toBytes("info2"); - protected static final TestReplicationSinkTranslator TRANSLATOR = new TestReplicationSinkTranslator(); + protected static final TestReplicationSinkTranslator TRANSLATOR = + new TestReplicationSinkTranslator(); protected static Stoppable STOPPABLE = new Stoppable() { final AtomicBoolean stop = new AtomicBoolean(false); @@ -215,7 +219,8 @@ public void testBatchSinkWithTranslation() throws Exception { /** * Insert a mix of puts and deletes */ - public void testMixedPutDelete(TableName tableName, byte[] family, Table sinkTable) throws Exception { + public void testMixedPutDelete(TableName tableName, byte[] family, Table sinkTable) + throws Exception { List entries = new ArrayList<>(BATCH_SIZE / 2); List cells = new ArrayList<>(); Set rows = new HashSet<>(); @@ -235,7 +240,8 @@ public void testMixedPutDelete(TableName tableName, byte[] family, Table sinkTab entries.add(createEntry(tableName, row, family, family, row, KeyValue.Type.Put, cells)); rows.add(i); } else { - entries.add(createEntry(tableName, row, family, family, null, KeyValue.Type.DeleteColumn, cells)); + entries.add( + createEntry(tableName, row, family, family, null, KeyValue.Type.DeleteColumn, cells)); rows.remove(i); } } @@ -258,7 +264,8 @@ public void testMixedPutDeleteWithTranslation() throws Exception { testMixedPutDelete(TABLE_NAME3, FAM_NAME1, table4); } - public void testLargeEditsPutDelete(TableName tableName, byte[] family, Table sinkTable) throws Exception { + public void testLargeEditsPutDelete(TableName tableName, byte[] family, Table sinkTable) + throws Exception { List entries = new ArrayList<>(); List cells = new ArrayList<>(); Set rows = new HashSet<>(); @@ -285,7 +292,8 @@ public void testLargeEditsPutDelete(TableName tableName, byte[] family, Table si entries.add(createEntry(tableName, row, family, family, row, KeyValue.Type.Put, cells)); rows.add(i); } else { - entries.add(createEntry(tableName, row, family, family, null, KeyValue.Type.DeleteColumn, cells)); + entries.add( + createEntry(tableName, row, family, family, null, KeyValue.Type.DeleteColumn, cells)); rows.remove(i); } } @@ -313,8 +321,8 @@ public void testLargeEditsPutDeleteWithTranslation() throws Exception { /** * Insert to 2 different tables */ - public void testMixedPutTables(TableName tableName1, byte[] family1, Table sinkTable1, TableName tableName2, byte[] family2, - Table sinkTable2) throws Exception { + public void testMixedPutTables(TableName tableName1, byte[] family1, Table sinkTable1, + TableName tableName2, byte[] family2, Table sinkTable2) throws Exception { List entries = new ArrayList<>(BATCH_SIZE / 2); List cells = new ArrayList<>(); Set rows1 = new HashSet<>(); @@ -356,7 +364,8 @@ public void testMixedPutTablesWithTranslation() throws Exception { /** * Insert then do different types of deletes */ - public void testMixedDeletes(TableName tableName, byte[] family, Table sinkTable) throws Exception { + public void testMixedDeletes(TableName tableName, byte[] family, Table sinkTable) + throws Exception { List entries = new ArrayList<>(3); List cells = new ArrayList<>(); for (int i = 0; i < 3; i++) { @@ -367,9 +376,12 @@ public void testMixedDeletes(TableName tableName, byte[] family, Table sinkTable replicationClusterId, baseNamespaceDir, hfileArchiveDir); entries = new ArrayList<>(3); cells = new ArrayList<>(); - entries.add(createEntry(tableName, Bytes.toBytes(0), family, family, null, KeyValue.Type.DeleteColumn, cells)); - entries.add(createEntry(tableName, Bytes.toBytes(1), family, null, null, KeyValue.Type.DeleteFamily, cells)); - entries.add(createEntry(tableName, Bytes.toBytes(2), family, family, null, KeyValue.Type.DeleteColumn, cells)); + entries.add(createEntry(tableName, Bytes.toBytes(0), family, family, null, + KeyValue.Type.DeleteColumn, cells)); + entries.add(createEntry(tableName, Bytes.toBytes(1), family, null, null, + KeyValue.Type.DeleteFamily, cells)); + entries.add(createEntry(tableName, Bytes.toBytes(2), family, family, null, + KeyValue.Type.DeleteColumn, cells)); SINK.replicateEntries(entries, PrivateCellUtil.createExtendedCellScanner(cells.iterator()), replicationClusterId, baseNamespaceDir, hfileArchiveDir); @@ -393,7 +405,8 @@ public void testMixedDeletesWithTranslation() throws Exception { * Puts are buffered, but this tests when a delete (not-buffered) is applied before the actual Put * that creates it. */ - public void testApplyDeleteBeforePut(TableName tableName, byte[] family, Table sinkTable) throws Exception { + public void testApplyDeleteBeforePut(TableName tableName, byte[] family, Table sinkTable) + throws Exception { List entries = new ArrayList<>(5); List cells = new ArrayList<>(); Set rows = new HashSet<>(); @@ -403,7 +416,8 @@ public void testApplyDeleteBeforePut(TableName tableName, byte[] family, Table s rows.add(i); } int deleteRow = 1; - entries.add(createEntry(tableName, Bytes.toBytes(1), family, null, null, KeyValue.Type.DeleteFamily, cells)); + entries.add(createEntry(tableName, Bytes.toBytes(1), family, null, null, + KeyValue.Type.DeleteFamily, cells)); rows.remove(deleteRow); for (int i = 3; i < 5; i++) { byte[] row = Bytes.toBytes(i); @@ -428,15 +442,16 @@ public void testApplyDeleteBeforePutWithTranslation() throws Exception { testApplyDeleteBeforePut(TABLE_NAME3, FAM_NAME1, table4); } - public void testRethrowRetriesExhaustedException(TableName tableName, byte[] family, TableName sinkTableName) - throws Exception { + public void testRethrowRetriesExhaustedException(TableName tableName, byte[] family, + TableName sinkTableName) throws Exception { TableName notExistTable = TableName.valueOf("notExistTable"); byte[] notExistFamily = Bytes.toBytes("notExistFamily"); List entries = new ArrayList<>(); List cells = new ArrayList<>(); for (int i = 0; i < 10; i++) { byte[] row = Bytes.toBytes(i); - entries.add(createEntry(notExistTable, row, notExistFamily, notExistFamily, row, KeyValue.Type.Put, cells)); + entries.add(createEntry(notExistTable, row, notExistFamily, notExistFamily, row, + KeyValue.Type.Put, cells)); } try { SINK.replicateEntries(entries, PrivateCellUtil.createExtendedCellScanner(cells.iterator()), @@ -570,8 +585,8 @@ public void testReplicateEntriesForHFilesWithTranslation() throws Exception { /** * Test failure metrics produced for failed replication edits */ - public void testFailedReplicationSinkMetrics(TableName tableName, byte[] family, TableName sinkTableName) - throws IOException { + public void testFailedReplicationSinkMetrics(TableName tableName, byte[] family, + TableName sinkTableName) throws IOException { long initialFailedBatches = SINK.getSinkMetrics().getFailedBatches(); long errorCount = 0L; List entries = new ArrayList<>(BATCH_SIZE); @@ -598,7 +613,8 @@ public void testFailedReplicationSinkMetrics(TableName tableName, byte[] family, byte[] notExistFamily = Bytes.toBytes("notExistFamily"); for (int i = 0; i < BATCH_SIZE; i++) { byte[] row = Bytes.toBytes(i); - entries.add(createEntry(notExistTable, row, notExistFamily, notExistFamily, row, KeyValue.Type.Put, cells)); + entries.add(createEntry(notExistTable, row, notExistFamily, notExistFamily, row, + KeyValue.Type.Put, cells)); } try { SINK.replicateEntries(entries, PrivateCellUtil.createExtendedCellScanner(cells.iterator()), @@ -645,14 +661,14 @@ public void testFailedReplicationSinkMetricsWithTranslation() throws IOException } private void validatePuts(TableName tableName, Set rows, byte[] family, byte[] qualifier, - Table sinkTable) - throws IOException { + Table sinkTable) throws IOException { for (int row : rows) { byte[] rowBytes = Bytes.toBytes(row); Get get = new Get(TRANSLATOR.getSinkRowKey(tableName, rowBytes)); Result res = sinkTable.get(get); assertTrue(!res.isEmpty()); - assertArrayEquals(res.getValue(TRANSLATOR.getSinkFamily(tableName, family), TRANSLATOR.getSinkQualifier(tableName, family, qualifier)), rowBytes); + assertArrayEquals(res.getValue(TRANSLATOR.getSinkFamily(tableName, family), + TRANSLATOR.getSinkQualifier(tableName, family, qualifier)), rowBytes); } } From 0c2d864a0cc646b4c3cd807d4376c2dc22145c48 Mon Sep 17 00:00:00 2001 From: Evie Boland Date: Thu, 9 Jan 2025 14:19:05 -0500 Subject: [PATCH 9/9] Fix possibility of NPE --- .../hbase/replication/regionserver/ReplicationSink.java | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java index 3c3a5bd1bbab..e190ddce1798 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java @@ -188,18 +188,15 @@ private void decorateConf() { private ReplicationSinkTranslator getReplicationSinkTranslator() throws IOException { Class translatorClass = this.conf.getClass(HConstants.REPLICATION_SINK_TRANSLATOR, - IdentityReplicationSinkTranslator.class); - ReplicationSinkTranslator translator = null; + IdentityReplicationSinkTranslator.class, ReplicationSinkTranslator.class); try { - translator = translatorClass == null - ? null - : (ReplicationSinkTranslator) translatorClass.getDeclaredConstructor().newInstance(); + return (ReplicationSinkTranslator) translatorClass.getDeclaredConstructor().newInstance(); } catch (RuntimeException e) { throw e; } catch (Exception e) { LOG.warn("Failed to instantiate " + translatorClass); + return new IdentityReplicationSinkTranslator(); } - return translator; } /**