diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index c882e71e877a..602e7a36d4c1 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -968,6 +968,9 @@ public enum OperationStatusCode { public static final String REPLICATION_SINK_SERVICE_CLASSNAME_DEFAULT = "org.apache.hadoop.hbase.replication.ReplicationSinkServiceImpl"; public static final String REPLICATION_BULKLOAD_ENABLE_KEY = "hbase.replication.bulkload.enabled"; + public static final String REPLICATION_SINK_TRANSLATOR = "hbase.replication.sink.translator"; + public static final String REPLICATION_SINK_TRANSLATOR_DEFAULT = + "org.apache.hadoop.hbase.replication.regionserver.IdentityReplicationSinkTranslator"; public static final boolean REPLICATION_BULKLOAD_ENABLE_DEFAULT = false; /** Replication cluster id of source cluster which uniquely identifies itself with peer cluster */ public static final String REPLICATION_CLUSTER_ID = "hbase.replication.cluster.id"; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationSinkTrackerTableCreator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationSinkTrackerTableCreator.java index 38cf33090d9b..7df00b546c33 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationSinkTrackerTableCreator.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationSinkTrackerTableCreator.java @@ -80,7 +80,7 @@ private ReplicationSinkTrackerTableCreator() { /* * We will create this table only if hbase.regionserver.replication.sink.tracker.enabled is - * enabled and table doesn't exists already. + * enabled and table doesn't exist already. */ public static void createIfNeededAndNotExists(Configuration conf, MasterServices masterServices) throws IOException { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HFileReplicator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HFileReplicator.java index 2d0b4e32ced0..326f9cbf47a3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HFileReplicator.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HFileReplicator.java @@ -94,11 +94,12 @@ public class HFileReplicator implements Closeable { private int maxCopyThreads; private int copiesPerThread; private List sourceClusterIds; + private ReplicationSinkTranslator translator; public HFileReplicator(Configuration sourceClusterConf, String sourceBaseNamespaceDirPath, String sourceHFileArchiveDirPath, Map>>> tableQueueMap, - Configuration conf, AsyncClusterConnection connection, List sourceClusterIds) - throws IOException { + Configuration conf, AsyncClusterConnection connection, List sourceClusterIds, + ReplicationSinkTranslator translator) throws IOException { this.sourceClusterConf = sourceClusterConf; this.sourceBaseNamespaceDirPath = sourceBaseNamespaceDirPath; this.sourceHFileArchiveDirPath = sourceHFileArchiveDirPath; @@ -106,6 +107,7 @@ public HFileReplicator(Configuration sourceClusterConf, String sourceBaseNamespa this.conf = conf; this.connection = connection; this.sourceClusterIds = sourceClusterIds; + this.translator = translator; userProvider = UserProvider.instantiate(conf); fsDelegationToken = new FsDelegationToken(userProvider, "renewer"); @@ -131,29 +133,30 @@ public void close() throws IOException { public Void replicate() throws IOException { // Copy all the hfiles to the local file system - Map tableStagingDirsMap = copyHFilesToStagingDir(); + Map tableToSinkStagingDir = copySourceHFilesToSinkStagingDir(); int maxRetries = conf.getInt(HConstants.BULKLOAD_MAX_RETRIES_NUMBER, 10); - for (Entry tableStagingDir : tableStagingDirsMap.entrySet()) { - String tableNameString = tableStagingDir.getKey(); - Path stagingDir = tableStagingDir.getValue(); - TableName tableName = TableName.valueOf(tableNameString); + for (Entry tableStagingDir : tableToSinkStagingDir.entrySet()) { + String tableNameStr = tableStagingDir.getKey(); + TableName tableName = TableName.valueOf(tableNameStr); + TableName sinkTableName = translator.getSinkTableName(tableName); + Path sinkStagingDir = tableStagingDir.getValue(); // Prepare collection of queue of hfiles to be loaded(replicated) Deque queue = new LinkedList<>(); - BulkLoadHFilesTool.prepareHFileQueue(conf, connection, tableName, stagingDir, queue, false, - false); + BulkLoadHFilesTool.prepareHFileQueue(conf, connection, sinkTableName, sinkStagingDir, queue, + false, false); if (queue.isEmpty()) { - LOG.warn("Did not find any files to replicate in directory {}", stagingDir.toUri()); + LOG.warn("Did not find any files to replicate in directory {}", sinkStagingDir.toUri()); return null; } fsDelegationToken.acquireDelegationToken(sinkFs); try { - doBulkLoad(conf, tableName, stagingDir, queue, maxRetries); + doBulkLoad(conf, sinkTableName, sinkStagingDir, queue, maxRetries); } finally { - cleanup(stagingDir); + cleanup(sinkStagingDir); } } return null; @@ -194,12 +197,12 @@ private void cleanup(Path stagingDir) { // Do not close the file system } - private Map copyHFilesToStagingDir() throws IOException { + private Map copySourceHFilesToSinkStagingDir() throws IOException { Map mapOfCopiedHFiles = new HashMap<>(); Pair> familyHFilePathsPair; List hfilePaths; byte[] family; - Path familyStagingDir; + Path sinkFamilyStagingDir; int familyHFilePathsPairsListSize; int totalNoOfHFiles; List>> familyHFilePathsPairsList; @@ -224,32 +227,33 @@ private Map copyHFilesToStagingDir() throws IOException { // For each table name in the map for (Entry>>> tableEntry : bulkLoadHFileMap .entrySet()) { - String tableName = tableEntry.getKey(); + String tableNameStr = tableEntry.getKey(); + TableName tableName = TableName.valueOf(tableNameStr); // Create staging directory for each table - Path stagingDir = createStagingDir(hbaseStagingDir, user, TableName.valueOf(tableName)); + Path sinkStagingDir = createSinkStagingDir(hbaseStagingDir, user, tableName); familyHFilePathsPairsList = tableEntry.getValue(); familyHFilePathsPairsListSize = familyHFilePathsPairsList.size(); - // For each list of family hfile paths pair in the table + // For each (family, hfile paths) pair in the table for (int i = 0; i < familyHFilePathsPairsListSize; i++) { familyHFilePathsPair = familyHFilePathsPairsList.get(i); family = familyHFilePathsPair.getFirst(); hfilePaths = familyHFilePathsPair.getSecond(); - familyStagingDir = new Path(stagingDir, Bytes.toString(family)); + sinkFamilyStagingDir = getSinkFamilyStagingDir(sinkStagingDir, tableName, family); totalNoOfHFiles = hfilePaths.size(); - // For each list of hfile paths for the family + // For each hfile path in the family List> futures = new ArrayList<>(); Callable c; Future future; int currentCopied = 0; - // Copy the hfiles parallely + // Copy the hfiles in parallel while (totalNoOfHFiles > currentCopied + this.copiesPerThread) { - c = new Copier(sourceFs, familyStagingDir, + c = new Copier(sourceFs, sinkFamilyStagingDir, hfilePaths.subList(currentCopied, currentCopied + this.copiesPerThread)); future = exec.submit(c); futures.add(future); @@ -258,7 +262,7 @@ private Map copyHFilesToStagingDir() throws IOException { int remaining = totalNoOfHFiles - currentCopied; if (remaining > 0) { - c = new Copier(sourceFs, familyStagingDir, + c = new Copier(sourceFs, sinkFamilyStagingDir, hfilePaths.subList(currentCopied, currentCopied + remaining)); future = exec.submit(c); futures.add(future); @@ -281,7 +285,7 @@ private Map copyHFilesToStagingDir() throws IOException { } // Add the staging directory to this table. Staging directory contains all the hfiles // belonging to this table - mapOfCopiedHFiles.put(tableName, stagingDir); + mapOfCopiedHFiles.put(tableNameStr, sinkStagingDir); } return mapOfCopiedHFiles; } finally { @@ -294,12 +298,14 @@ private Map copyHFilesToStagingDir() throws IOException { } } - private Path createStagingDir(Path baseDir, User user, TableName tableName) throws IOException { - String tblName = tableName.getNameAsString().replace(":", UNDERSCORE); + private Path createSinkStagingDir(Path baseDir, User user, TableName tableName) + throws IOException { + TableName sinkTableName = translator.getSinkTableName(tableName); + String sinkTableNameStr = sinkTableName.getNameAsString().replace(":", UNDERSCORE); int RANDOM_WIDTH = 320; int RANDOM_RADIX = 32; String doubleUnderScore = UNDERSCORE + UNDERSCORE; - String randomDir = user.getShortName() + doubleUnderScore + tblName + doubleUnderScore + String randomDir = user.getShortName() + doubleUnderScore + sinkTableNameStr + doubleUnderScore + (new BigInteger(RANDOM_WIDTH, ThreadLocalRandom.current()).toString(RANDOM_RADIX)); return createStagingDir(baseDir, user, randomDir); } @@ -311,50 +317,55 @@ private Path createStagingDir(Path baseDir, User user, String randomDir) throws return p; } + private Path getSinkFamilyStagingDir(Path baseDir, TableName tableName, byte[] family) { + byte[] sinkFamily = translator.getSinkFamily(tableName, family); + return new Path(baseDir, Bytes.toString(sinkFamily)); + } + /** * This class will copy the given hfiles from the given source file system to the given local file * system staging directory. */ private class Copier implements Callable { private FileSystem sourceFs; - private Path stagingDir; - private List hfiles; + private Path sinkStagingDir; + private List hfilePaths; - public Copier(FileSystem sourceFs, final Path stagingDir, final List hfiles) + public Copier(FileSystem sourceFs, final Path sinkStagingDir, final List hfilePaths) throws IOException { this.sourceFs = sourceFs; - this.stagingDir = stagingDir; - this.hfiles = hfiles; + this.sinkStagingDir = sinkStagingDir; + this.hfilePaths = hfilePaths; } @Override public Void call() throws IOException { Path sourceHFilePath; - Path localHFilePath; - int totalHFiles = hfiles.size(); + Path sinkHFilePath; + int totalHFiles = hfilePaths.size(); for (int i = 0; i < totalHFiles; i++) { - sourceHFilePath = new Path(sourceBaseNamespaceDirPath, hfiles.get(i)); - localHFilePath = new Path(stagingDir, sourceHFilePath.getName()); + sourceHFilePath = new Path(sourceBaseNamespaceDirPath, hfilePaths.get(i)); + sinkHFilePath = new Path(sinkStagingDir, sourceHFilePath.getName()); try { - FileUtil.copy(sourceFs, sourceHFilePath, sinkFs, localHFilePath, false, conf); + FileUtil.copy(sourceFs, sourceHFilePath, sinkFs, sinkHFilePath, false, conf); // If any other exception other than FNFE then we will fail the replication requests and // source will retry to replicate these data. } catch (FileNotFoundException e) { - LOG.info("Failed to copy hfile from " + sourceHFilePath + " to " + localHFilePath + LOG.info("Failed to copy hfile from " + sourceHFilePath + " to " + sinkHFilePath + ". Trying to copy from hfile archive directory.", e); - sourceHFilePath = new Path(sourceHFileArchiveDirPath, hfiles.get(i)); + sourceHFilePath = new Path(sourceHFileArchiveDirPath, hfilePaths.get(i)); try { - FileUtil.copy(sourceFs, sourceHFilePath, sinkFs, localHFilePath, false, conf); + FileUtil.copy(sourceFs, sourceHFilePath, sinkFs, sinkHFilePath, false, conf); } catch (FileNotFoundException e1) { // This will mean that the hfile does not exists any where in source cluster FS. So we // cannot do anything here just log and continue. - LOG.debug("Failed to copy hfile from " + sourceHFilePath + " to " + localHFilePath + LOG.debug("Failed to copy hfile from " + sourceHFilePath + " to " + sinkHFilePath + ". Hence ignoring this hfile from replication..", e1); continue; } } - sinkFs.setPermission(localHFilePath, PERM_ALL_ACCESS); + sinkFs.setPermission(sinkHFilePath, PERM_ALL_ACCESS); } return null; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/IdentityReplicationSinkTranslator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/IdentityReplicationSinkTranslator.java new file mode 100644 index 000000000000..cf1496eddf36 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/IdentityReplicationSinkTranslator.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication.regionserver; + +import org.apache.hadoop.hbase.ExtendedCell; +import org.apache.hadoop.hbase.TableName; +import org.apache.yetus.audience.InterfaceAudience; + +@InterfaceAudience.Public +public class IdentityReplicationSinkTranslator implements ReplicationSinkTranslator { + @Override + public TableName getSinkTableName(TableName tableName) { + return tableName; + } + + @Override + public byte[] getSinkRowKey(TableName tableName, byte[] rowKey) { + return rowKey; + } + + @Override + public byte[] getSinkFamily(TableName tableName, byte[] family) { + return family; + } + + @Override + public byte[] getSinkQualifier(TableName tableName, byte[] family, byte[] qualifier) { + return qualifier; + } + + @Override + public ExtendedCell getSinkExtendedCell(TableName tableName, ExtendedCell cell) { + return cell; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java index 508ace390565..e190ddce1798 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java @@ -72,6 +72,7 @@ import org.apache.hbase.thirdparty.com.google.common.collect.Lists; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry; import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; @@ -139,6 +140,8 @@ public ReplicationSink(Configuration conf, RegionServerCoprocessorHost rsServerH Class c = Class.forName(className).asSubclass(SourceFSConfigurationProvider.class); this.provider = c.getDeclaredConstructor().newInstance(); + } catch (RuntimeException e) { + throw e; } catch (Exception e) { throw new IllegalArgumentException( "Configured source fs configuration provider class " + className + " throws error.", e); @@ -153,6 +156,8 @@ private WALEntrySinkFilter setupWALEntrySinkFilter() throws IOException { filter = walEntryFilterClass == null ? null : (WALEntrySinkFilter) walEntryFilterClass.getDeclaredConstructor().newInstance(); + } catch (RuntimeException e) { + throw e; } catch (Exception e) { LOG.warn("Failed to instantiate " + walEntryFilterClass); } @@ -181,6 +186,19 @@ private void decorateConf() { } } + private ReplicationSinkTranslator getReplicationSinkTranslator() throws IOException { + Class translatorClass = this.conf.getClass(HConstants.REPLICATION_SINK_TRANSLATOR, + IdentityReplicationSinkTranslator.class, ReplicationSinkTranslator.class); + try { + return (ReplicationSinkTranslator) translatorClass.getDeclaredConstructor().newInstance(); + } catch (RuntimeException e) { + throw e; + } catch (Exception e) { + LOG.warn("Failed to instantiate " + translatorClass); + return new IdentityReplicationSinkTranslator(); + } + } + /** * Replicate this array of entries directly into the local cluster using the native client. Only * operates against raw protobuf type saving on a conversion from pb to pojo. @@ -202,18 +220,22 @@ public void replicateEntries(List entries, final ExtendedCellScanner c // Very simple optimization where we batch sequences of rows going // to the same table. try { + ReplicationSinkTranslator translator = getReplicationSinkTranslator(); long totalReplicated = 0; - // Map of table => list of Rows, grouped by cluster id, we only want to flushCommits once per - // invocation of this method per table and cluster id. - Map, List>> rowMap = new TreeMap<>(); + Map, Map>>>> bulkLoadsPerClusters = + new HashMap<>(); - Map, Map>>>> bulkLoadsPerClusters = null; - Pair, List> mutationsToWalEntriesPairs = + // Map of tableName => list of Rows, grouped by source cluster id. + // In each call to this method, we only want to flushCommits once per table per source + // clusterq + Map, List>> sinkRowMap = new TreeMap<>(); + Pair, List> sinkMutationsToWalEntriesPairs = new Pair<>(new ArrayList<>(), new ArrayList<>()); + for (WALEntry entry : entries) { - TableName table = TableName.valueOf(entry.getKey().getTableName().toByteArray()); + TableName tableName = TableName.valueOf(entry.getKey().getTableName().toByteArray()); if (this.walEntrySinkFilter != null) { - if (this.walEntrySinkFilter.filter(table, entry.getKey().getWriteTime())) { + if (this.walEntrySinkFilter.filter(tableName, entry.getKey().getWriteTime())) { // Skip Cells in CellScanner associated with this entry. int count = entry.getAssociatedCellCount(); for (int i = 0; i < count; i++) { @@ -226,8 +248,8 @@ public void replicateEntries(List entries, final ExtendedCellScanner c continue; } } - ExtendedCell previousCell = null; - Mutation mutation = null; + ExtendedCell sinkCellPrev = null; + Mutation sinkMutation = null; int count = entry.getAssociatedCellCount(); for (int i = 0; i < count; i++) { // Throw index out of bounds if our cell count is off @@ -236,81 +258,71 @@ public void replicateEntries(List entries, final ExtendedCellScanner c throw new ArrayIndexOutOfBoundsException("Expected=" + count + ", index=" + i); } ExtendedCell cell = cells.current(); - // Handle bulk load hfiles replication if (CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) { + // Bulk load events BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell); if (bld.getReplicate()) { - if (bulkLoadsPerClusters == null) { - bulkLoadsPerClusters = new HashMap<>(); - } - // Map of table name Vs list of pair of family and list of - // hfile paths from its namespace + // Map of tableNameStr to (family, hfile paths) pairs Map>>> bulkLoadHFileMap = bulkLoadsPerClusters.computeIfAbsent(bld.getClusterIdsList(), k -> new HashMap<>()); - buildBulkLoadHFileMap(bulkLoadHFileMap, table, bld); + buildBulkLoadHFileMap(bulkLoadHFileMap, bld); } } else if (CellUtil.matchingQualifier(cell, WALEdit.REPLICATION_MARKER)) { Mutation put = processReplicationMarkerEntry(cell); if (put == null) { continue; } - table = REPLICATION_SINK_TRACKER_TABLE_NAME; - List clusterIds = new ArrayList<>(); - for (HBaseProtos.UUID clusterId : entry.getKey().getClusterIdsList()) { - clusterIds.add(toUUID(clusterId)); - } + List clusterIds = getSourceClusterIds(entry); put.setClusterIds(clusterIds); - addToHashMultiMap(rowMap, table, clusterIds, put); + addToHashMultiMap(sinkRowMap, REPLICATION_SINK_TRACKER_TABLE_NAME, clusterIds, put); } else { - // Handle wal replication - if (isNewRowOrType(previousCell, cell)) { - // Create new mutation - mutation = CellUtil.isDelete(cell) - ? new Delete(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()) - : new Put(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()); - List clusterIds = new ArrayList<>(entry.getKey().getClusterIdsList().size()); - for (HBaseProtos.UUID clusterId : entry.getKey().getClusterIdsList()) { - clusterIds.add(toUUID(clusterId)); - } - mutation.setClusterIds(clusterIds); - mutation.setAttribute(ReplicationUtils.REPLICATION_ATTR_NAME, + TableName sinkTableName = translator.getSinkTableName(tableName); + ExtendedCell sinkCell = translator.getSinkExtendedCell(tableName, cell); + if (isNewRowOrType(sinkCellPrev, sinkCell)) { + sinkMutation = CellUtil.isDelete(sinkCell) + ? new Delete(sinkCell.getRowArray(), sinkCell.getRowOffset(), + sinkCell.getRowLength()) + : new Put(sinkCell.getRowArray(), sinkCell.getRowOffset(), sinkCell.getRowLength()); + List clusterIds = getSourceClusterIds(entry); + sinkMutation.setClusterIds(clusterIds); + sinkMutation.setAttribute(ReplicationUtils.REPLICATION_ATTR_NAME, HConstants.EMPTY_BYTE_ARRAY); if (rsServerHost != null) { - rsServerHost.preReplicationSinkBatchMutate(entry, mutation); - mutationsToWalEntriesPairs.getFirst().add(mutation); - mutationsToWalEntriesPairs.getSecond().add(entry); + rsServerHost.preReplicationSinkBatchMutate(entry, sinkMutation); + sinkMutationsToWalEntriesPairs.getFirst().add(sinkMutation); + sinkMutationsToWalEntriesPairs.getSecond().add(entry); } - addToHashMultiMap(rowMap, table, clusterIds, mutation); + addToHashMultiMap(sinkRowMap, sinkTableName, clusterIds, sinkMutation); } - if (CellUtil.isDelete(cell)) { - ((Delete) mutation).add(cell); + if (CellUtil.isDelete(sinkCell)) { + ((Delete) sinkMutation).add(sinkCell); } else { - ((Put) mutation).add(cell); + ((Put) sinkMutation).add(sinkCell); } - previousCell = cell; + sinkCellPrev = sinkCell; } } totalReplicated++; } // TODO Replicating mutations and bulk loaded data can be made parallel - if (!rowMap.isEmpty()) { + if (!sinkRowMap.isEmpty()) { LOG.debug("Started replicating mutations."); - for (Entry, List>> entry : rowMap.entrySet()) { + for (Entry, List>> entry : sinkRowMap.entrySet()) { batch(entry.getKey(), entry.getValue().values(), rowSizeWarnThreshold); } LOG.debug("Finished replicating mutations."); - } - if (rsServerHost != null) { - List mutations = mutationsToWalEntriesPairs.getFirst(); - List walEntries = mutationsToWalEntriesPairs.getSecond(); - for (int i = 0; i < mutations.size(); i++) { - rsServerHost.postReplicationSinkBatchMutate(walEntries.get(i), mutations.get(i)); + if (rsServerHost != null) { + List sinkMutations = sinkMutationsToWalEntriesPairs.getFirst(); + List walEntries = sinkMutationsToWalEntriesPairs.getSecond(); + for (int i = 0; i < sinkMutations.size(); i++) { + rsServerHost.postReplicationSinkBatchMutate(walEntries.get(i), sinkMutations.get(i)); + } } } - if (bulkLoadsPerClusters != null) { + if (!bulkLoadsPerClusters.isEmpty()) { for (Entry, Map>>>> entry : bulkLoadsPerClusters.entrySet()) { Map>>> bulkLoadHFileMap = entry.getValue(); @@ -319,7 +331,7 @@ public void replicateEntries(List entries, final ExtendedCellScanner c Configuration providerConf = this.provider.getConf(this.conf, replicationClusterId); try (HFileReplicator hFileReplicator = new HFileReplicator(providerConf, sourceBaseNamespaceDirPath, sourceHFileArchiveDirPath, bulkLoadHFileMap, conf, - getConnection(), entry.getKey())) { + getConnection(), entry.getKey(), translator)) { hFileReplicator.replicate(); LOG.debug("Finished replicating {} bulk loaded data", entry.getKey().toString()); } @@ -338,6 +350,14 @@ public void replicateEntries(List entries, final ExtendedCellScanner c } } + private List getSourceClusterIds(WALEntry entry) { + List clusterIds = new ArrayList<>(entry.getKey().getClusterIdsList().size()); + for (HBaseProtos.UUID clusterId : entry.getKey().getClusterIdsList()) { + clusterIds.add(toUUID(clusterId)); + } + return clusterIds; + } + /* * First check if config key hbase.regionserver.replication.sink.tracker.enabled is true or not. * If false, then ignore this cell. If set to true, de-serialize value into @@ -366,11 +386,10 @@ private Put processReplicationMarkerEntry(Cell cell) throws IOException { } private void buildBulkLoadHFileMap( - final Map>>> bulkLoadHFileMap, TableName table, - BulkLoadDescriptor bld) throws IOException { + final Map>>> bulkLoadHFileMap, BulkLoadDescriptor bld) + throws IOException { List storesList = bld.getStoresList(); - int storesSize = storesList.size(); - for (int j = 0; j < storesSize; j++) { + for (int j = 0; j < storesList.size(); j++) { StoreDescriptor storeDescriptor = storesList.get(j); List storeFileList = storeDescriptor.getStoreFileList(); int storeFilesSize = storeFileList.size(); @@ -378,10 +397,11 @@ private void buildBulkLoadHFileMap( for (int k = 0; k < storeFilesSize; k++) { byte[] family = storeDescriptor.getFamilyName().toByteArray(); - // Build hfile relative path from its namespace - String pathToHfileFromNS = getHFilePath(table, bld, storeFileList.get(k), family); - String tableName = table.getNameWithNamespaceInclAsString(); - List>> familyHFilePathsList = bulkLoadHFileMap.get(tableName); + // Build relative hfile path starting with its namespace dir + TableName tableName = ProtobufUtil.toTableName(bld.getTableName()); + String pathToHfileFromNS = getHFilePath(tableName, bld, storeFileList.get(k), family); + String tableNameStr = tableName.getNameWithNamespaceInclAsString(); + List>> familyHFilePathsList = bulkLoadHFileMap.get(tableNameStr); if (familyHFilePathsList != null) { boolean foundFamily = false; for (Pair> familyHFilePathsPair : familyHFilePathsList) { @@ -393,12 +413,12 @@ private void buildBulkLoadHFileMap( } } if (!foundFamily) { - // Family not found, add this family and its hfile paths pair to the list + // Family not found, add this (family, hfile paths) pair to the list addFamilyAndItsHFilePathToTableInMap(family, pathToHfileFromNS, familyHFilePathsList); } } else { // Add this table entry into the map - addNewTableEntryInMap(bulkLoadHFileMap, family, pathToHfileFromNS, tableName); + addNewTableEntryInMap(bulkLoadHFileMap, family, pathToHfileFromNS, tableNameStr); } } } @@ -422,10 +442,10 @@ private void addNewTableEntryInMap( bulkLoadHFileMap.put(tableName, newFamilyHFilePathsList); } - private String getHFilePath(TableName table, BulkLoadDescriptor bld, String storeFile, + private String getHFilePath(TableName tableName, BulkLoadDescriptor bld, String storeFile, byte[] family) { - return new StringBuilder(100).append(table.getNamespaceAsString()).append(Path.SEPARATOR) - .append(table.getQualifierAsString()).append(Path.SEPARATOR) + return new StringBuilder(100).append(tableName.getNamespaceAsString()).append(Path.SEPARATOR) + .append(tableName.getQualifierAsString()).append(Path.SEPARATOR) .append(Bytes.toString(bld.getEncodedRegionName().toByteArray())).append(Path.SEPARATOR) .append(Bytes.toString(family)).append(Path.SEPARATOR).append(storeFile).toString(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkTranslator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkTranslator.java new file mode 100644 index 000000000000..5b0168d51b81 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkTranslator.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication.regionserver; + +import org.apache.hadoop.hbase.ExtendedCell; +import org.apache.hadoop.hbase.TableName; +import org.apache.yetus.audience.InterfaceAudience; + +@InterfaceAudience.Public +public interface ReplicationSinkTranslator { + + public TableName getSinkTableName(TableName tableName); + + public byte[] getSinkRowKey(TableName tableName, byte[] rowKey); + + public byte[] getSinkFamily(TableName tableName, byte[] family); + + public byte[] getSinkQualifier(TableName tableName, byte[] family, byte[] qualifier); + + public ExtendedCell getSinkExtendedCell(TableName tableName, ExtendedCell cell); +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java index bdb51ebe36f8..476b06f2ed56 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java @@ -17,7 +17,9 @@ */ package org.apache.hadoop.hbase.replication.regionserver; +import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import java.io.IOException; import java.util.ArrayList; @@ -35,7 +37,10 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.CellBuilderType; import org.apache.hadoop.hbase.ExtendedCell; +import org.apache.hadoop.hbase.ExtendedCellBuilder; +import org.apache.hadoop.hbase.ExtendedCellBuilderFactory; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtil; import org.apache.hadoop.hbase.HConstants; @@ -97,11 +102,17 @@ public class TestReplicationSink { protected static final TableName TABLE_NAME1 = TableName.valueOf("table1"); protected static final TableName TABLE_NAME2 = TableName.valueOf("table2"); + protected static final TableName TABLE_NAME3 = TableName.valueOf("table3"); + protected static final TableName TABLE_NAME4 = TableName.valueOf("table4"); + protected static final TableName TABLE_NAME5 = TableName.valueOf("table5"); + protected static final TableName TABLE_NAME6 = TableName.valueOf("table6"); protected static final byte[] FAM_NAME1 = Bytes.toBytes("info1"); protected static final byte[] FAM_NAME2 = Bytes.toBytes("info2"); - protected static Table table1; + protected static final TestReplicationSinkTranslator TRANSLATOR = + new TestReplicationSinkTranslator(); + protected static Stoppable STOPPABLE = new Stoppable() { final AtomicBoolean stop = new AtomicBoolean(false); @@ -118,7 +129,12 @@ public void stop(String why) { }; + protected static Table table1; protected static Table table2; + protected static Table table3; + protected static Table table4; + protected static Table table5; + protected static Table table6; protected static String baseNamespaceDir; protected static String hfileArchiveDir; protected static String replicationClusterId; @@ -130,12 +146,18 @@ public void stop(String why) { public static void setUpBeforeClass() throws Exception { TEST_UTIL.getConfiguration().set("hbase.replication.source.fs.conf.provider", TestSourceFSConfigurationProvider.class.getCanonicalName()); + TEST_UTIL.getConfiguration().setClass(HConstants.REPLICATION_SINK_TRANSLATOR, + TestReplicationSinkTranslator.class, ReplicationSinkTranslator.class); TEST_UTIL.startMiniCluster(3); RegionServerCoprocessorHost rsCpHost = TEST_UTIL.getMiniHBaseCluster().getRegionServer(0).getRegionServerCoprocessorHost(); SINK = new ReplicationSink(new Configuration(TEST_UTIL.getConfiguration()), rsCpHost); table1 = TEST_UTIL.createTable(TABLE_NAME1, FAM_NAME1); table2 = TEST_UTIL.createTable(TABLE_NAME2, FAM_NAME2); + table3 = TEST_UTIL.createTable(TABLE_NAME3, FAM_NAME1); + table4 = TEST_UTIL.createTable(TABLE_NAME4, FAM_NAME2); + table5 = TEST_UTIL.createTable(TABLE_NAME5, FAM_NAME1); + table6 = TEST_UTIL.createTable(TABLE_NAME6, FAM_NAME1); Path rootDir = CommonFSUtils.getRootDir(TEST_UTIL.getConfiguration()); baseNamespaceDir = new Path(rootDir, new Path(HConstants.BASE_NAMESPACE_DIR)).toString(); hfileArchiveDir = new Path(rootDir, new Path(HConstants.HFILE_ARCHIVE_DIRECTORY)).toString(); @@ -158,34 +180,54 @@ public static void tearDownAfterClass() throws Exception { public void setUp() throws Exception { table1 = TEST_UTIL.deleteTableData(TABLE_NAME1); table2 = TEST_UTIL.deleteTableData(TABLE_NAME2); + table3 = TEST_UTIL.deleteTableData(TABLE_NAME3); + table4 = TEST_UTIL.deleteTableData(TABLE_NAME4); + table5 = TEST_UTIL.deleteTableData(TABLE_NAME5); + table6 = TEST_UTIL.deleteTableData(TABLE_NAME6); } /** * Insert a whole batch of entries */ - @Test - public void testBatchSink() throws Exception { + public void testBatchSink(TableName tableName, byte[] family, Table sinkTable) throws Exception { List entries = new ArrayList<>(BATCH_SIZE); List cells = new ArrayList<>(); + Set rows = new HashSet<>(); for (int i = 0; i < BATCH_SIZE; i++) { - entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells)); + byte[] row = Bytes.toBytes(i); + entries.add(createEntry(tableName, row, family, family, row, KeyValue.Type.Put, cells)); + rows.add(i); } SINK.replicateEntries(entries, PrivateCellUtil.createExtendedCellScanner(cells.iterator()), replicationClusterId, baseNamespaceDir, hfileArchiveDir); Scan scan = new Scan(); - ResultScanner scanRes = table1.getScanner(scan); + ResultScanner scanRes = sinkTable.getScanner(scan); assertEquals(BATCH_SIZE, scanRes.next(BATCH_SIZE).length); + validatePuts(tableName, rows, family, family, sinkTable); + } + + @Test + public void testBatchSink() throws Exception { + testBatchSink(TABLE_NAME1, FAM_NAME1, table1); + } + + @Test + public void testBatchSinkWithTranslation() throws Exception { + testBatchSink(TABLE_NAME3, FAM_NAME1, table4); } /** * Insert a mix of puts and deletes */ - @Test - public void testMixedPutDelete() throws Exception { + public void testMixedPutDelete(TableName tableName, byte[] family, Table sinkTable) + throws Exception { List entries = new ArrayList<>(BATCH_SIZE / 2); List cells = new ArrayList<>(); + Set rows = new HashSet<>(); for (int i = 0; i < BATCH_SIZE / 2; i++) { - entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells)); + byte[] row = Bytes.toBytes(i); + entries.add(createEntry(tableName, row, family, family, row, KeyValue.Type.Put, cells)); + rows.add(i); } SINK.replicateEntries(entries, PrivateCellUtil.createExtendedCellScanner(cells), replicationClusterId, baseNamespaceDir, hfileArchiveDir); @@ -193,28 +235,49 @@ public void testMixedPutDelete() throws Exception { entries = new ArrayList<>(BATCH_SIZE); cells = new ArrayList<>(); for (int i = 0; i < BATCH_SIZE; i++) { - entries.add(createEntry(TABLE_NAME1, i, - i % 2 != 0 ? KeyValue.Type.Put : KeyValue.Type.DeleteColumn, cells)); + byte[] row = Bytes.toBytes(i); + if (i % 2 != 0) { + entries.add(createEntry(tableName, row, family, family, row, KeyValue.Type.Put, cells)); + rows.add(i); + } else { + entries.add( + createEntry(tableName, row, family, family, null, KeyValue.Type.DeleteColumn, cells)); + rows.remove(i); + } } SINK.replicateEntries(entries, PrivateCellUtil.createExtendedCellScanner(cells.iterator()), replicationClusterId, baseNamespaceDir, hfileArchiveDir); Scan scan = new Scan(); - ResultScanner scanRes = table1.getScanner(scan); + ResultScanner scanRes = sinkTable.getScanner(scan); assertEquals(BATCH_SIZE / 2, scanRes.next(BATCH_SIZE).length); + validatePuts(tableName, rows, family, family, sinkTable); } @Test - public void testLargeEditsPutDelete() throws Exception { + public void testMixedPutDelete() throws Exception { + testMixedPutDelete(TABLE_NAME1, FAM_NAME1, table1); + } + + @Test + public void testMixedPutDeleteWithTranslation() throws Exception { + testMixedPutDelete(TABLE_NAME3, FAM_NAME1, table4); + } + + public void testLargeEditsPutDelete(TableName tableName, byte[] family, Table sinkTable) + throws Exception { List entries = new ArrayList<>(); List cells = new ArrayList<>(); + Set rows = new HashSet<>(); for (int i = 0; i < 5510; i++) { - entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells)); + byte[] row = Bytes.toBytes(i); + entries.add(createEntry(tableName, row, family, family, row, KeyValue.Type.Put, cells)); + rows.add(i); } SINK.replicateEntries(entries, PrivateCellUtil.createExtendedCellScanner(cells), replicationClusterId, baseNamespaceDir, hfileArchiveDir); - ResultScanner resultScanner = table1.getScanner(new Scan()); + ResultScanner resultScanner = sinkTable.getScanner(new Scan()); int totalRows = 0; while (resultScanner.next() != null) { totalRows++; @@ -224,98 +287,171 @@ public void testLargeEditsPutDelete() throws Exception { entries = new ArrayList<>(); cells = new ArrayList<>(); for (int i = 0; i < 11000; i++) { - entries.add(createEntry(TABLE_NAME1, i, - i % 2 != 0 ? KeyValue.Type.Put : KeyValue.Type.DeleteColumn, cells)); + byte[] row = Bytes.toBytes(i); + if (i % 2 != 0) { + entries.add(createEntry(tableName, row, family, family, row, KeyValue.Type.Put, cells)); + rows.add(i); + } else { + entries.add( + createEntry(tableName, row, family, family, null, KeyValue.Type.DeleteColumn, cells)); + rows.remove(i); + } } SINK.replicateEntries(entries, PrivateCellUtil.createExtendedCellScanner(cells), replicationClusterId, baseNamespaceDir, hfileArchiveDir); - resultScanner = table1.getScanner(new Scan()); + resultScanner = sinkTable.getScanner(new Scan()); totalRows = 0; while (resultScanner.next() != null) { totalRows++; } assertEquals(5500, totalRows); + validatePuts(tableName, rows, family, family, sinkTable); + } + + @Test + public void testLargeEditsPutDelete() throws Exception { + testLargeEditsPutDelete(TABLE_NAME1, FAM_NAME1, table1); + } + + @Test + public void testLargeEditsPutDeleteWithTranslation() throws Exception { + testLargeEditsPutDelete(TABLE_NAME3, FAM_NAME1, table4); } /** * Insert to 2 different tables */ - @Test - public void testMixedPutTables() throws Exception { + public void testMixedPutTables(TableName tableName1, byte[] family1, Table sinkTable1, + TableName tableName2, byte[] family2, Table sinkTable2) throws Exception { List entries = new ArrayList<>(BATCH_SIZE / 2); List cells = new ArrayList<>(); + Set rows1 = new HashSet<>(); + Set rows2 = new HashSet<>(); for (int i = 0; i < BATCH_SIZE; i++) { - entries.add(createEntry(i % 2 == 0 ? TABLE_NAME2 : TABLE_NAME1, i, KeyValue.Type.Put, cells)); + byte[] row = Bytes.toBytes(i); + if (i % 2 == 0) { + entries.add(createEntry(tableName2, row, family2, family2, row, KeyValue.Type.Put, cells)); + rows2.add(i); + } else { + entries.add(createEntry(tableName1, row, family1, family1, row, KeyValue.Type.Put, cells)); + rows1.add(i); + } } SINK.replicateEntries(entries, PrivateCellUtil.createExtendedCellScanner(cells.iterator()), replicationClusterId, baseNamespaceDir, hfileArchiveDir); Scan scan = new Scan(); - ResultScanner scanRes = table2.getScanner(scan); + ResultScanner scanRes = sinkTable2.getScanner(scan); for (Result res : scanRes) { assertEquals(0, Bytes.toInt(res.getRow()) % 2); } - scanRes = table1.getScanner(scan); + scanRes = sinkTable1.getScanner(scan); for (Result res : scanRes) { assertEquals(1, Bytes.toInt(res.getRow()) % 2); } } + @Test + public void testMixedPutTables() throws Exception { + testMixedPutTables(TABLE_NAME1, FAM_NAME1, table1, TABLE_NAME2, FAM_NAME2, table2); + } + + @Test + public void testMixedPutTablesWithTranslation() throws Exception { + testMixedPutTables(TABLE_NAME3, FAM_NAME1, table4, TABLE_NAME2, FAM_NAME2, table2); + } + /** * Insert then do different types of deletes */ - @Test - public void testMixedDeletes() throws Exception { + public void testMixedDeletes(TableName tableName, byte[] family, Table sinkTable) + throws Exception { List entries = new ArrayList<>(3); List cells = new ArrayList<>(); for (int i = 0; i < 3; i++) { - entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells)); + byte[] row = Bytes.toBytes(i); + entries.add(createEntry(tableName, row, family, family, row, KeyValue.Type.Put, cells)); } SINK.replicateEntries(entries, PrivateCellUtil.createExtendedCellScanner(cells.iterator()), replicationClusterId, baseNamespaceDir, hfileArchiveDir); entries = new ArrayList<>(3); cells = new ArrayList<>(); - entries.add(createEntry(TABLE_NAME1, 0, KeyValue.Type.DeleteColumn, cells)); - entries.add(createEntry(TABLE_NAME1, 1, KeyValue.Type.DeleteFamily, cells)); - entries.add(createEntry(TABLE_NAME1, 2, KeyValue.Type.DeleteColumn, cells)); + entries.add(createEntry(tableName, Bytes.toBytes(0), family, family, null, + KeyValue.Type.DeleteColumn, cells)); + entries.add(createEntry(tableName, Bytes.toBytes(1), family, null, null, + KeyValue.Type.DeleteFamily, cells)); + entries.add(createEntry(tableName, Bytes.toBytes(2), family, family, null, + KeyValue.Type.DeleteColumn, cells)); SINK.replicateEntries(entries, PrivateCellUtil.createExtendedCellScanner(cells.iterator()), replicationClusterId, baseNamespaceDir, hfileArchiveDir); Scan scan = new Scan(); - ResultScanner scanRes = table1.getScanner(scan); + ResultScanner scanRes = sinkTable.getScanner(scan); assertEquals(0, scanRes.next(3).length); } + @Test + public void testMixedDeletes() throws Exception { + testMixedDeletes(TABLE_NAME1, FAM_NAME1, table1); + } + + @Test + public void testMixedDeletesWithTranslation() throws Exception { + testMixedDeletes(TABLE_NAME3, FAM_NAME1, table4); + } + /** * Puts are buffered, but this tests when a delete (not-buffered) is applied before the actual Put * that creates it. */ - @Test - public void testApplyDeleteBeforePut() throws Exception { + public void testApplyDeleteBeforePut(TableName tableName, byte[] family, Table sinkTable) + throws Exception { List entries = new ArrayList<>(5); List cells = new ArrayList<>(); + Set rows = new HashSet<>(); for (int i = 0; i < 2; i++) { - entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells)); - } - entries.add(createEntry(TABLE_NAME1, 1, KeyValue.Type.DeleteFamily, cells)); + byte[] row = Bytes.toBytes(i); + entries.add(createEntry(tableName, row, family, family, row, KeyValue.Type.Put, cells)); + rows.add(i); + } + int deleteRow = 1; + entries.add(createEntry(tableName, Bytes.toBytes(1), family, null, null, + KeyValue.Type.DeleteFamily, cells)); + rows.remove(deleteRow); for (int i = 3; i < 5; i++) { - entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells)); + byte[] row = Bytes.toBytes(i); + entries.add(createEntry(tableName, row, family, family, row, KeyValue.Type.Put, cells)); + rows.add(i); } SINK.replicateEntries(entries, PrivateCellUtil.createExtendedCellScanner(cells.iterator()), replicationClusterId, baseNamespaceDir, hfileArchiveDir); - Get get = new Get(Bytes.toBytes(1)); - Result res = table1.get(get); + Get get = new Get(TRANSLATOR.getSinkRowKey(tableName, Bytes.toBytes(deleteRow))); + Result res = sinkTable.get(get); assertEquals(0, res.size()); + validatePuts(tableName, rows, family, family, sinkTable); } @Test - public void testRethrowRetriesExhaustedException() throws Exception { + public void testApplyDeleteBeforePut() throws Exception { + testApplyDeleteBeforePut(TABLE_NAME1, FAM_NAME1, table1); + } + + @Test + public void testApplyDeleteBeforePutWithTranslation() throws Exception { + testApplyDeleteBeforePut(TABLE_NAME3, FAM_NAME1, table4); + } + + public void testRethrowRetriesExhaustedException(TableName tableName, byte[] family, + TableName sinkTableName) throws Exception { TableName notExistTable = TableName.valueOf("notExistTable"); + byte[] notExistFamily = Bytes.toBytes("notExistFamily"); List entries = new ArrayList<>(); List cells = new ArrayList<>(); for (int i = 0; i < 10; i++) { - entries.add(createEntry(notExistTable, i, KeyValue.Type.Put, cells)); + byte[] row = Bytes.toBytes(i); + entries.add(createEntry(notExistTable, row, notExistFamily, notExistFamily, row, + KeyValue.Type.Put, cells)); } try { SINK.replicateEntries(entries, PrivateCellUtil.createExtendedCellScanner(cells.iterator()), @@ -326,11 +462,12 @@ public void testRethrowRetriesExhaustedException() throws Exception { entries.clear(); cells.clear(); for (int i = 0; i < 10; i++) { - entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells)); + byte[] row = Bytes.toBytes(i); + entries.add(createEntry(tableName, row, family, family, row, KeyValue.Type.Put, cells)); } try (Connection conn = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration())) { try (Admin admin = conn.getAdmin()) { - admin.disableTable(TABLE_NAME1); + admin.disableTable(sinkTableName); try { SINK.replicateEntries(entries, PrivateCellUtil.createExtendedCellScanner(cells.iterator()), replicationClusterId, @@ -338,17 +475,27 @@ public void testRethrowRetriesExhaustedException() throws Exception { Assert.fail("Should re-throw RetriesExhaustedWithDetailsException."); } catch (RetriesExhaustedException e) { } finally { - admin.enableTable(TABLE_NAME1); + admin.enableTable(sinkTableName); } } } } + @Test + public void testRethrowRetriesExhaustedException() throws Exception { + testRethrowRetriesExhaustedException(TABLE_NAME1, FAM_NAME1, TABLE_NAME1); + } + + @Test + public void testRethrowRetriesExhaustedExceptionWithTranslation() throws Exception { + testRethrowRetriesExhaustedException(TABLE_NAME3, FAM_NAME1, TABLE_NAME4); + } + /** * Test replicateEntries with a bulk load entry for 25 HFiles */ - @Test - public void testReplicateEntriesForHFiles() throws Exception { + public void testReplicateEntriesForHFiles(TableName tableName, byte[] family, Table sinkTable) + throws Exception { Path dir = TEST_UTIL.getDataTestDirOnTestFS("testReplicateEntries"); Path familyDir = new Path(dir, Bytes.toString(FAM_NAME1)); int numRows = 10; @@ -371,7 +518,7 @@ public void testReplicateEntriesForHFiles() throws Exception { Iterator numbersItr = numberList.iterator(); for (int i = 0; i < 25; i++) { Path hfilePath = new Path(familyDir, hfilePrefix + i); - HFileTestUtil.createHFile(conf, fs, hfilePath, FAM_NAME1, FAM_NAME1, + HFileTestUtil.createHFile(conf, fs, hfilePath, family, family, Bytes.toBytes(numbersItr.next()), Bytes.toBytes(numbersItr.next()), numRows); p.add(hfilePath); storeFilesSize.put(hfilePath.getName(), fs.getFileStatus(hfilePath).getLen()); @@ -379,14 +526,14 @@ public void testReplicateEntriesForHFiles() throws Exception { // 3. Create a BulkLoadDescriptor and a WALEdit Map> storeFiles = new HashMap<>(1); - storeFiles.put(FAM_NAME1, p); + storeFiles.put(family, p); org.apache.hadoop.hbase.wal.WALEdit edit = null; WALProtos.BulkLoadDescriptor loadDescriptor = null; try (Connection c = ConnectionFactory.createConnection(conf); - RegionLocator l = c.getRegionLocator(TABLE_NAME1)) { + RegionLocator l = c.getRegionLocator(tableName)) { RegionInfo regionInfo = l.getAllRegionLocations().get(0).getRegion(); - loadDescriptor = ProtobufUtil.toBulkLoadDescriptor(TABLE_NAME1, + loadDescriptor = ProtobufUtil.toBulkLoadDescriptor(tableName, UnsafeByteOperations.unsafeWrap(regionInfo.getEncodedNameAsBytes()), storeFiles, storeFilesSize, 1); edit = org.apache.hadoop.hbase.wal.WALEdit.createBulkLoadEvent(regionInfo, loadDescriptor); @@ -394,12 +541,12 @@ public void testReplicateEntriesForHFiles() throws Exception { List entries = new ArrayList<>(1); // 4. Create a WALEntryBuilder - WALEntry.Builder builder = createWALEntryBuilder(TABLE_NAME1); + WALEntry.Builder builder = createWALEntryBuilder(tableName); // 5. Copy the hfile to the path as it is in reality for (int i = 0; i < 25; i++) { - String pathToHfileFromNS = new StringBuilder(100).append(TABLE_NAME1.getNamespaceAsString()) - .append(Path.SEPARATOR).append(Bytes.toString(TABLE_NAME1.getName())).append(Path.SEPARATOR) + String pathToHfileFromNS = new StringBuilder(100).append(tableName.getNamespaceAsString()) + .append(Path.SEPARATOR).append(Bytes.toString(tableName.getName())).append(Path.SEPARATOR) .append(Bytes.toString(loadDescriptor.getEncodedRegionName().toByteArray())) .append(Path.SEPARATOR).append(Bytes.toString(FAM_NAME1)).append(Path.SEPARATOR) .append(hfilePrefix + i).toString(); @@ -409,7 +556,7 @@ public void testReplicateEntriesForHFiles() throws Exception { } entries.add(builder.build()); - try (ResultScanner scanner = table1.getScanner(new Scan())) { + try (ResultScanner scanner = sinkTable.getScanner(new Scan())) { // 6. Assert no existing data in table assertEquals(0, scanner.next(numRows).length); } @@ -418,24 +565,37 @@ public void testReplicateEntriesForHFiles() throws Exception { PrivateCellUtil .createExtendedCellScanner(WALEditInternalHelper.getExtendedCells(edit).iterator()), replicationClusterId, baseNamespaceDir, hfileArchiveDir); - try (ResultScanner scanner = table1.getScanner(new Scan())) { + try (ResultScanner scanner = sinkTable.getScanner(new Scan())) { // 8. Assert data is replicated assertEquals(numRows, scanner.next(numRows).length); } - // Clean up the created hfiles or it will mess up subsequent tests + // Clean up the created hfiles, or it will mess up subsequent tests + } + + @Test + public void testReplicateEntriesForHFiles() throws Exception { + testReplicateEntriesForHFiles(TABLE_NAME1, FAM_NAME1, table1); + } + + @Test + public void testReplicateEntriesForHFilesWithTranslation() throws Exception { + testReplicateEntriesForHFiles(TABLE_NAME5, FAM_NAME1, table6); } /** * Test failure metrics produced for failed replication edits */ - @Test - public void testFailedReplicationSinkMetrics() throws IOException { + public void testFailedReplicationSinkMetrics(TableName tableName, byte[] family, + TableName sinkTableName) throws IOException { long initialFailedBatches = SINK.getSinkMetrics().getFailedBatches(); long errorCount = 0L; List entries = new ArrayList<>(BATCH_SIZE); List cells = new ArrayList<>(); + Set rows = new HashSet<>(); for (int i = 0; i < BATCH_SIZE; i++) { - entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells)); + byte[] row = Bytes.toBytes(i); + entries.add(createEntry(tableName, row, family, family, row, KeyValue.Type.Put, cells)); + rows.add(i); } cells.clear(); // cause IndexOutOfBoundsException try { @@ -450,8 +610,11 @@ public void testFailedReplicationSinkMetrics() throws IOException { entries.clear(); cells.clear(); TableName notExistTable = TableName.valueOf("notExistTable"); // cause TableNotFoundException + byte[] notExistFamily = Bytes.toBytes("notExistFamily"); for (int i = 0; i < BATCH_SIZE; i++) { - entries.add(createEntry(notExistTable, i, KeyValue.Type.Put, cells)); + byte[] row = Bytes.toBytes(i); + entries.add(createEntry(notExistTable, row, notExistFamily, notExistFamily, row, + KeyValue.Type.Put, cells)); } try { SINK.replicateEntries(entries, PrivateCellUtil.createExtendedCellScanner(cells.iterator()), @@ -465,12 +628,13 @@ public void testFailedReplicationSinkMetrics() throws IOException { entries.clear(); cells.clear(); for (int i = 0; i < BATCH_SIZE; i++) { - entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells)); + byte[] row = Bytes.toBytes(i); + entries.add(createEntry(tableName, row, family, family, row, KeyValue.Type.Put, cells)); } // cause IOException in batch() try (Connection conn = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration())) { try (Admin admin = conn.getAdmin()) { - admin.disableTable(TABLE_NAME1); + admin.disableTable(sinkTableName); try { SINK.replicateEntries(entries, PrivateCellUtil.createExtendedCellScanner(cells.iterator()), replicationClusterId, @@ -480,16 +644,36 @@ public void testFailedReplicationSinkMetrics() throws IOException { errorCount++; assertEquals(initialFailedBatches + errorCount, SINK.getSinkMetrics().getFailedBatches()); } finally { - admin.enableTable(TABLE_NAME1); + admin.enableTable(sinkTableName); } } } } - private WALEntry createEntry(TableName table, int row, KeyValue.Type type, - List cells) { - byte[] fam = table.equals(TABLE_NAME1) ? FAM_NAME1 : FAM_NAME2; - byte[] rowBytes = Bytes.toBytes(row); + @Test + public void testFailedReplicationSinkMetrics() throws IOException { + testFailedReplicationSinkMetrics(TABLE_NAME1, FAM_NAME1, TABLE_NAME1); + } + + @Test + public void testFailedReplicationSinkMetricsWithTranslation() throws IOException { + testFailedReplicationSinkMetrics(TABLE_NAME3, FAM_NAME1, TABLE_NAME4); + } + + private void validatePuts(TableName tableName, Set rows, byte[] family, byte[] qualifier, + Table sinkTable) throws IOException { + for (int row : rows) { + byte[] rowBytes = Bytes.toBytes(row); + Get get = new Get(TRANSLATOR.getSinkRowKey(tableName, rowBytes)); + Result res = sinkTable.get(get); + assertTrue(!res.isEmpty()); + assertArrayEquals(res.getValue(TRANSLATOR.getSinkFamily(tableName, family), + TRANSLATOR.getSinkQualifier(tableName, family, qualifier)), rowBytes); + } + } + + private WALEntry createEntry(TableName table, byte[] row, byte[] family, byte[] qualifier, + byte[] value, KeyValue.Type type, List cells) { // Just make sure we don't get the same ts for two consecutive rows with // same key try { @@ -500,11 +684,11 @@ private WALEntry createEntry(TableName table, int row, KeyValue.Type type, final long now = EnvironmentEdgeManager.currentTime(); KeyValue kv = null; if (type.getCode() == KeyValue.Type.Put.getCode()) { - kv = new KeyValue(rowBytes, fam, fam, now, KeyValue.Type.Put, Bytes.toBytes(row)); + kv = new KeyValue(row, family, qualifier, now, KeyValue.Type.Put, value); } else if (type.getCode() == KeyValue.Type.DeleteColumn.getCode()) { - kv = new KeyValue(rowBytes, fam, fam, now, KeyValue.Type.DeleteColumn); + kv = new KeyValue(row, family, qualifier, now, KeyValue.Type.DeleteColumn); } else if (type.getCode() == KeyValue.Type.DeleteFamily.getCode()) { - kv = new KeyValue(rowBytes, fam, null, now, KeyValue.Type.DeleteFamily); + kv = new KeyValue(row, family, null, now, KeyValue.Type.DeleteFamily); } WALEntry.Builder builder = createWALEntryBuilder(table); cells.add(kv); @@ -512,7 +696,7 @@ private WALEntry createEntry(TableName table, int row, KeyValue.Type type, return builder.build(); } - public static WALEntry.Builder createWALEntryBuilder(TableName table) { + public static WALEntry.Builder createWALEntryBuilder(TableName tableName) { WALEntry.Builder builder = WALEntry.newBuilder(); builder.setAssociatedCellCount(1); WALKey.Builder keyBuilder = WALKey.newBuilder(); @@ -520,11 +704,63 @@ public static WALEntry.Builder createWALEntryBuilder(TableName table) { uuidBuilder.setLeastSigBits(HConstants.DEFAULT_CLUSTER_ID.getLeastSignificantBits()); uuidBuilder.setMostSigBits(HConstants.DEFAULT_CLUSTER_ID.getMostSignificantBits()); keyBuilder.setClusterId(uuidBuilder.build()); - keyBuilder.setTableName(UnsafeByteOperations.unsafeWrap(table.getName())); + keyBuilder.setTableName(UnsafeByteOperations.unsafeWrap(tableName.getName())); keyBuilder.setWriteTime(EnvironmentEdgeManager.currentTime()); keyBuilder.setEncodedRegionName(UnsafeByteOperations.unsafeWrap(HConstants.EMPTY_BYTE_ARRAY)); keyBuilder.setLogSequenceNumber(-1); builder.setKey(keyBuilder.build()); return builder; } + + public static class TestReplicationSinkTranslator implements ReplicationSinkTranslator { + + private static final ExtendedCellBuilder EXTENDED_CELL_BUILDER = + ExtendedCellBuilderFactory.create(CellBuilderType.DEEP_COPY); + + @Override + public TableName getSinkTableName(TableName tableName) { + if (tableName.equals(TABLE_NAME3)) { + return TABLE_NAME4; + } else if (tableName.equals(TABLE_NAME5)) { + return TABLE_NAME6; + } + return tableName; + } + + @Override + public byte[] getSinkRowKey(TableName tableName, byte[] rowKey) { + return rowKey; + } + + @Override + public byte[] getSinkFamily(TableName tableName, byte[] family) { + if (tableName.equals(TABLE_NAME3) && Bytes.equals(family, FAM_NAME1)) { + return FAM_NAME2; + } + return family; + } + + @Override + public byte[] getSinkQualifier(TableName tableName, byte[] family, byte[] qualifier) { + return qualifier; + } + + @Override + public ExtendedCell getSinkExtendedCell(TableName tableName, ExtendedCell cell) { + if ( + tableName.equals(TABLE_NAME3) && Bytes.equals(cell.getFamilyArray(), cell.getFamilyOffset(), + cell.getFamilyLength(), FAM_NAME1, 0, FAM_NAME1.length) + ) { + return EXTENDED_CELL_BUILDER.clear() + .setRow(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()).setFamily(FAM_NAME2) + .setQualifier(cell.getQualifierArray(), cell.getQualifierOffset(), + cell.getQualifierLength()) + .setTimestamp(cell.getTimestamp()) + .setTags(cell.getTagsArray(), cell.getTagsOffset(), cell.getTagsLength()) + .setSequenceId(cell.getSequenceId()).setType(cell.getType()) + .setValue(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()).build(); + } + return cell; + } + } }