diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/AbstractPitrRestoreHandler.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/AbstractPitrRestoreHandler.java index b2edce6b0fd1..8072277bf684 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/AbstractPitrRestoreHandler.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/AbstractPitrRestoreHandler.java @@ -41,6 +41,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.backup.BackupType; import org.apache.hadoop.hbase.backup.PointInTimeRestoreRequest; import org.apache.hadoop.hbase.backup.RestoreRequest; import org.apache.hadoop.hbase.backup.util.BackupUtils; @@ -248,6 +249,8 @@ private PitrBackupMetadata getValidBackup(TableName sTableName, TableName tTable try { if (backupAdmin.validateRequest(restoreRequest)) { + // check if any bulkload entry exists post this backup time and before "endtime" + checkBulkLoadAfterBackup(conn, sTableName, backup, endTime); return backup; } } catch (IOException e) { @@ -259,6 +262,31 @@ private PitrBackupMetadata getValidBackup(TableName sTableName, TableName tTable return null; } + /** + * Checks if any bulk load operation occurred for the specified table post last successful backup + * and before restore time. + * @param conn Active HBase connection + * @param sTableName Table for which to check bulk load history + * @param backup Last successful backup before the target recovery time + * @param endTime Target recovery time + * @throws IOException if a bulkload entry is found in between backup time and endtime + */ + private void checkBulkLoadAfterBackup(Connection conn, TableName sTableName, + PitrBackupMetadata backup, long endTime) throws IOException { + try (BackupSystemTable backupSystemTable = new BackupSystemTable(conn)) { + List bulkLoads = backupSystemTable.readBulkloadRows(List.of(sTableName)); + for (BulkLoad load : bulkLoads) { + long lastBackupTs = (backup.getType() == BackupType.FULL) + ? backup.getStartTs() + : backup.getIncrCommittedWalTs(); + if (lastBackupTs < load.getTimestamp() && load.getTimestamp() < endTime) { + throw new IOException("Bulk load operation detected after last successful backup for " + + "table: " + sTableName); + } + } + } + } + /** * Determines if the given backup is valid for PITR. *

diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupImageAdapter.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupImageAdapter.java index 8b785a0f0504..b6d8d4901a22 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupImageAdapter.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupImageAdapter.java @@ -19,6 +19,7 @@ import java.util.List; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.backup.BackupType; import org.apache.hadoop.hbase.backup.impl.BackupManifest.BackupImage; import org.apache.yetus.audience.InterfaceAudience; @@ -57,4 +58,14 @@ public String getBackupId() { public String getRootDir() { return image.getRootDir(); } + + @Override + public BackupType getType() { + return image.getType(); + } + + @Override + public long getIncrCommittedWalTs() { + return image.getIncrCommittedWalTs(); + } } diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupInfoAdapter.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupInfoAdapter.java index 967fae551cb5..34d812121e02 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupInfoAdapter.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupInfoAdapter.java @@ -20,6 +20,7 @@ import java.util.List; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.backup.BackupInfo; +import org.apache.hadoop.hbase.backup.BackupType; import org.apache.yetus.audience.InterfaceAudience; /** @@ -57,4 +58,14 @@ public String getBackupId() { public String getRootDir() { return info.getBackupRootDir(); } + + @Override + public BackupType getType() { + return info.getType(); + } + + @Override + public long getIncrCommittedWalTs() { + return info.getIncrCommittedWalTs(); + } } diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManifest.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManifest.java index 59ae3857f2ec..f35755d24512 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManifest.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManifest.java @@ -101,6 +101,11 @@ Builder withCompleteTime(long completeTime) { return this; } + Builder withIncrCommittedWalTs(long incrCommittedWalTs) { + image.setIncrCommittedWalTs(incrCommittedWalTs); + return this; + } + BackupImage build() { return image; } @@ -115,6 +120,7 @@ BackupImage build() { private long completeTs; private ArrayList ancestors; private Map> incrTimeRanges; + private long incrCommittedWalTs; static Builder newBuilder() { return new Builder(); @@ -125,13 +131,14 @@ public BackupImage() { } private BackupImage(String backupId, BackupType type, String rootDir, List tableList, - long startTs, long completeTs) { + long startTs, long completeTs, long incrCommittedWalTs) { this.backupId = backupId; this.type = type; this.rootDir = rootDir; this.tableList = tableList; this.startTs = startTs; this.completeTs = completeTs; + this.incrCommittedWalTs = incrCommittedWalTs; } static BackupImage fromProto(BackupProtos.BackupImage im) { @@ -139,6 +146,7 @@ static BackupImage fromProto(BackupProtos.BackupImage im) { String rootDir = im.getBackupRootDir(); long startTs = im.getStartTs(); long completeTs = im.getCompleteTs(); + long incrCommittedWalTs = im.getIncrCommittedWalTs(); List tableListList = im.getTableListList(); List tableList = new ArrayList<>(); for (HBaseProtos.TableName tn : tableListList) { @@ -151,7 +159,8 @@ static BackupImage fromProto(BackupProtos.BackupImage im) { ? BackupType.FULL : BackupType.INCREMENTAL; - BackupImage image = new BackupImage(backupId, type, rootDir, tableList, startTs, completeTs); + BackupImage image = new BackupImage(backupId, type, rootDir, tableList, startTs, completeTs, + incrCommittedWalTs); for (BackupProtos.BackupImage img : ancestorList) { image.addAncestor(fromProto(img)); } @@ -170,6 +179,7 @@ BackupProtos.BackupImage toProto() { builder.setBackupId(backupId); builder.setCompleteTs(completeTs); builder.setStartTs(startTs); + builder.setIncrCommittedWalTs(incrCommittedWalTs); if (type == BackupType.FULL) { builder.setBackupType(BackupProtos.BackupType.FULL); } else { @@ -287,6 +297,14 @@ public long getCompleteTs() { return completeTs; } + public long getIncrCommittedWalTs() { + return incrCommittedWalTs; + } + + public void setIncrCommittedWalTs(long incrCommittedWalTs) { + this.incrCommittedWalTs = incrCommittedWalTs; + } + private void setCompleteTs(long completeTs) { this.completeTs = completeTs; } diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java index 2afefa4a5502..31ada8b040b3 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java @@ -485,8 +485,8 @@ public List readBulkloadRows(List tableList, long endTimest path = Bytes.toString(CellUtil.cloneValue(cell)); } } - LOG.debug("found orig {} for {} of table {} with timestamp {}", path, fam, region, - timestamp); + LOG.debug("Found orig path {} for family {} of table {} and region {} with timestamp {}", + path, fam, table, region, timestamp); if (timestamp <= endTimestamp) { result.add(new BulkLoad(table, region, fam, path, row, timestamp)); } diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/PitrBackupMetadata.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/PitrBackupMetadata.java index dc135ce79c08..3d143b336573 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/PitrBackupMetadata.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/PitrBackupMetadata.java @@ -20,6 +20,7 @@ import java.util.List; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.backup.BackupInfo; +import org.apache.hadoop.hbase.backup.BackupType; import org.apache.hadoop.hbase.backup.impl.BackupManifest.BackupImage; import org.apache.yetus.audience.InterfaceAudience; @@ -47,4 +48,10 @@ public interface PitrBackupMetadata { /** Returns Root directory where the backup is stored */ String getRootDir(); + + /** Returns backup type */ + BackupType getType(); + + /** Returns incrCommittedWalTs */ + long getIncrCommittedWalTs(); } diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithContinuous.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithContinuous.java index 170cc8665687..0978ff3ebef5 100644 --- a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithContinuous.java +++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithContinuous.java @@ -21,6 +21,7 @@ import static org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.REPLICATION_MARKER_ENABLED_KEY; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertTrue; import java.io.IOException; @@ -33,16 +34,13 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.backup.impl.BackupAdminImpl; import org.apache.hadoop.hbase.backup.impl.BackupManifest; import org.apache.hadoop.hbase.backup.impl.BackupSystemTable; import org.apache.hadoop.hbase.backup.util.BackupUtils; -import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.tool.BulkLoadHFiles; @@ -68,8 +66,6 @@ public class TestIncrementalBackupWithContinuous extends TestContinuousBackup { private static final Logger LOG = LoggerFactory.getLogger(TestIncrementalBackupWithContinuous.class); - private byte[] ROW = Bytes.toBytes("row1"); - private final byte[] COLUMN = Bytes.toBytes("col"); private static final int ROWS_IN_BULK_LOAD = 100; @Test @@ -186,11 +182,55 @@ public void testIncrementalBackupCopyingBulkloadTillIncrCommittedWalTs() throws } } - private void verifyTable(Table t1) throws IOException { - Get g = new Get(ROW); - Result r = t1.get(g); - assertEquals(1, r.size()); - assertTrue(CellUtil.matchingQualifier(r.rawCells()[0], COLUMN)); + @Test + public void testPitrFailureDueToMissingBackupPostBulkload() throws Exception { + conf1.setBoolean(REPLICATION_MARKER_ENABLED_KEY, true); + String methodName = Thread.currentThread().getStackTrace()[1].getMethodName(); + TableName tableName1 = TableName.valueOf("table_" + methodName); + TEST_UTIL.createTable(tableName1, famName); + try (BackupSystemTable systemTable = new BackupSystemTable(TEST_UTIL.getConnection())) { + + // The test starts with no data, and no bulk loaded rows. + int expectedRowCount = 0; + assertEquals(expectedRowCount, TEST_UTIL.countRows(tableName1)); + assertTrue(systemTable.readBulkloadRows(List.of(tableName1)).isEmpty()); + + // Create continuous backup, bulk loads are now being tracked + String backup1 = backupTables(BackupType.FULL, List.of(tableName1), BACKUP_ROOT_DIR, true); + assertTrue(checkSucceeded(backup1)); + + loadTable(TEST_UTIL.getConnection().getTable(tableName1)); + expectedRowCount = expectedRowCount + NB_ROWS_IN_BATCH; + performBulkLoad("bulkPreIncr", methodName, tableName1); + expectedRowCount += ROWS_IN_BULK_LOAD; + assertEquals(expectedRowCount, TEST_UTIL.countRows(tableName1)); + assertEquals(1, systemTable.readBulkloadRows(List.of(tableName1)).size()); + + loadTable(TEST_UTIL.getConnection().getTable(tableName1)); + Thread.sleep(5000); + + // Incremental backup + String backup2 = + backupTables(BackupType.INCREMENTAL, List.of(tableName1), BACKUP_ROOT_DIR, true); + assertTrue(checkSucceeded(backup2)); + assertEquals(0, systemTable.readBulkloadRows(List.of(tableName1)).size()); + + performBulkLoad("bulkPostIncr", methodName, tableName1); + assertEquals(1, systemTable.readBulkloadRows(List.of(tableName1)).size()); + + loadTable(TEST_UTIL.getConnection().getTable(tableName1)); + Thread.sleep(10000); + long restoreTs = BackupUtils.getReplicationCheckpoint(TEST_UTIL.getConnection()); + + // expect restore failure due to no backup post bulkPostIncr bulkload + TableName restoredTable = TableName.valueOf("restoredTable"); + String[] args = PITRTestUtil.buildPITRArgs(new TableName[] { tableName1 }, + new TableName[] { restoredTable }, restoreTs, null); + int ret = ToolRunner.run(conf1, new PointInTimeRestoreDriver(), args); + assertNotEquals("Restore should fail since there is one bulkload without any backup", 0, ret); + } finally { + conf1.setBoolean(REPLICATION_MARKER_ENABLED_KEY, REPLICATION_MARKER_ENABLED_DEFAULT); + } } private void performBulkLoad(String keyPrefix, String testDir, TableName tableName) diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestPointInTimeRestore.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestPointInTimeRestore.java index a1ce9c97a687..e9a0b50abcfa 100644 --- a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestPointInTimeRestore.java +++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestPointInTimeRestore.java @@ -67,8 +67,8 @@ private static void setUpBackups() throws Exception { // Simulate a backup taken 20 days ago EnvironmentEdgeManager .injectEdge(() -> System.currentTimeMillis() - 20 * ONE_DAY_IN_MILLISECONDS); - PITRTestUtil.loadRandomData(TEST_UTIL, table1, famName, 1000); // Insert initial data into - // table1 + // Insert initial data into table1 + PITRTestUtil.loadRandomData(TEST_UTIL, table1, famName, 1000); // Perform a full backup for table1 with continuous backup enabled String[] args = diff --git a/hbase-protocol-shaded/src/main/protobuf/Backup.proto b/hbase-protocol-shaded/src/main/protobuf/Backup.proto index 0ad1f5ba6191..b173848cd09d 100644 --- a/hbase-protocol-shaded/src/main/protobuf/Backup.proto +++ b/hbase-protocol-shaded/src/main/protobuf/Backup.proto @@ -65,6 +65,7 @@ message BackupImage { optional uint64 complete_ts = 6; repeated BackupImage ancestors = 7; repeated TableServerTimestamp tst_map = 8; + optional uint64 incr_committed_wal_ts = 9; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFilesTool.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFilesTool.java index 98e6631e3055..7c6e9c010250 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFilesTool.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFilesTool.java @@ -1195,6 +1195,11 @@ public int run(String[] args) throws Exception { public static void main(String[] args) throws Exception { Configuration conf = HBaseConfiguration.create(); int ret = ToolRunner.run(conf, new BulkLoadHFilesTool(conf), args); + if (ret == 0) { + System.out.println("Bulk load completed successfully."); + System.out.println("IMPORTANT: Please take a backup of the table immediately if this table " + + "is part of continuous backup"); + } System.exit(ret); }