Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,11 @@ public HashMap<String, Long> readRegionServerLastLogRollResult() throws IOExcept
return systemTable.readRegionServerLastLogRollResult(backupInfo.getBackupRootDir());
}

public List<BulkLoad> readBulkloadRows(List<TableName> tableList, long endTimestamp)
throws IOException {
return systemTable.readBulkloadRows(tableList, endTimestamp);
}

public List<BulkLoad> readBulkloadRows(List<TableName> tableList) throws IOException {
return systemTable.readBulkloadRows(tableList);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,16 @@ public void deleteBulkLoadedRows(List<byte[]> rows) throws IOException {
* @param tableList list of table names
*/
public List<BulkLoad> readBulkloadRows(List<TableName> tableList) throws IOException {
return readBulkloadRows(tableList, Long.MAX_VALUE);
}

/**
* Reads the rows from backup table recording bulk loaded hfiles
* @param tableList list of table names
* @param endTimestamp upper bound timestamp for bulkload entries retrieval
*/
public List<BulkLoad> readBulkloadRows(List<TableName> tableList, long endTimestamp)
throws IOException {
List<BulkLoad> result = new ArrayList<>();
for (TableName table : tableList) {
Scan scan = BackupSystemTable.createScanForOrigBulkLoadedFiles(table);
Expand All @@ -457,8 +467,10 @@ public List<BulkLoad> readBulkloadRows(List<TableName> tableList) throws IOExcep
String path = null;
String region = null;
byte[] row = null;
long timestamp = 0L;
for (Cell cell : res.listCells()) {
row = CellUtil.cloneRow(cell);
timestamp = cell.getTimestamp();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so is this timestamp the IncrCommittedWalTs? I cannot find other timestamp representing it but it looks like this PR share the same setup as #7150 that used the BulkLoad#timestamp as IncrCommittedWalTs?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This timestamp is Bulkload timestamp (time at which Bulkload operation is triggered). This will be saved in backup system table and compared with IncrCommittedWalTs during incremental backup

String rowStr = Bytes.toString(row);
region = BackupSystemTable.getRegionNameFromOrigBulkLoadRow(rowStr);
if (
Expand All @@ -473,8 +485,11 @@ public List<BulkLoad> readBulkloadRows(List<TableName> tableList) throws IOExcep
path = Bytes.toString(CellUtil.cloneValue(cell));
}
}
result.add(new BulkLoad(table, region, fam, path, row));
LOG.debug("found orig " + path + " for " + fam + " of table " + region);
LOG.debug("found orig {} for {} of table {} with timestamp {}", path, fam, region,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit adjust the short form to clear wording

Suggested change
LOG.debug("found orig {} for {} of table {} with timestamp {}", path, fam, region,
LOG.debug("found original path {} for column family {} of table {} with timestamp {}", path, fam, region,

timestamp);
if (timestamp <= endTimestamp) {
result.add(new BulkLoad(table, region, fam, path, row, timestamp));
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,16 @@ public class BulkLoad {
private final String columnFamily;
private final String hfilePath;
private final byte[] rowKey;
private final long timestamp;

public BulkLoad(TableName tableName, String region, String columnFamily, String hfilePath,
byte[] rowKey) {
byte[] rowKey, long timestamp) {
this.tableName = tableName;
this.region = region;
this.columnFamily = columnFamily;
this.hfilePath = hfilePath;
this.rowKey = rowKey;
this.timestamp = timestamp;
}

public TableName getTableName() {
Expand All @@ -64,6 +66,10 @@ public byte[] getRowKey() {
return rowKey;
}

public long getTimestamp() {
return timestamp;
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand All @@ -75,19 +81,20 @@ public boolean equals(Object o) {
BulkLoad that = (BulkLoad) o;
return new EqualsBuilder().append(tableName, that.tableName).append(region, that.region)
.append(columnFamily, that.columnFamily).append(hfilePath, that.hfilePath)
.append(rowKey, that.rowKey).isEquals();
.append(rowKey, that.rowKey).append(timestamp, that.timestamp).isEquals();
}

@Override
public int hashCode() {
return new HashCodeBuilder().append(tableName).append(region).append(columnFamily)
.append(hfilePath).append(rowKey).toHashCode();
.append(hfilePath).append(rowKey).append(timestamp).toHashCode();
}

@Override
public String toString() {
return new ToStringBuilder(this, ToStringStyle.NO_CLASS_NAME_STYLE)
.append("tableName", tableName).append("region", region).append("columnFamily", columnFamily)
.append("hfilePath", hfilePath).append("rowKey", rowKey).toString();
.append("hfilePath", hfilePath).append("rowKey", rowKey).append("timestamp", timestamp)
.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,13 @@ protected static int getIndex(TableName tbl, List<TableName> sTableList) {
protected List<BulkLoad> handleBulkLoad(List<TableName> tablesToBackup) throws IOException {
List<String> activeFiles = new ArrayList<>();
List<String> archiveFiles = new ArrayList<>();
List<BulkLoad> bulkLoads = backupManager.readBulkloadRows(tablesToBackup);
List<BulkLoad> bulkLoads;
if (backupInfo.isContinuousBackupEnabled()) {
bulkLoads =
backupManager.readBulkloadRows(tablesToBackup, backupInfo.getIncrCommittedWalTs());
} else {
bulkLoads = backupManager.readBulkloadRows(tablesToBackup);
}
FileSystem tgtFs;
try {
tgtFs = FileSystem.get(new URI(backupInfo.getBackupRootDir()), conf);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import org.apache.hadoop.hbase.backup.impl.BackupAdminImpl;
import org.apache.hadoop.hbase.backup.impl.BackupManifest;
import org.apache.hadoop.hbase.backup.impl.BackupSystemTable;
import org.apache.hadoop.hbase.backup.impl.BulkLoad;
import org.apache.hadoop.hbase.backup.util.BackupUtils;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
Expand Down Expand Up @@ -70,7 +69,6 @@ public class TestIncrementalBackupWithContinuous extends TestContinuousBackup {
LoggerFactory.getLogger(TestIncrementalBackupWithContinuous.class);

private byte[] ROW = Bytes.toBytes("row1");
private final byte[] FAMILY = Bytes.toBytes("family");
private final byte[] COLUMN = Bytes.toBytes("col");
private static final int ROWS_IN_BULK_LOAD = 100;

Expand All @@ -80,7 +78,7 @@ public void testContinuousBackupWithIncrementalBackupSuccess() throws Exception
conf1.setBoolean(REPLICATION_MARKER_ENABLED_KEY, true);
String methodName = Thread.currentThread().getStackTrace()[1].getMethodName();
TableName tableName = TableName.valueOf("table_" + methodName);
Table t1 = TEST_UTIL.createTable(tableName, FAMILY);
Table t1 = TEST_UTIL.createTable(tableName, famName);

try (BackupSystemTable table = new BackupSystemTable(TEST_UTIL.getConnection())) {
int before = table.getBackupHistory().size();
Expand All @@ -105,10 +103,8 @@ public void testContinuousBackupWithIncrementalBackupSuccess() throws Exception
assertEquals("Backup should contain the expected tables", Sets.newHashSet(tableName),
new HashSet<>(manifest.getTableList()));

Put p = new Put(ROW);
p.addColumn(FAMILY, COLUMN, COLUMN);
t1.put(p);
Thread.sleep(5000);
loadTable(t1);
Thread.sleep(10000);

// Run incremental backup
LOG.info("Run incremental backup now");
Expand All @@ -135,68 +131,57 @@ public void testContinuousBackupWithIncrementalBackupSuccess() throws Exception
client.restore(BackupUtils.createRestoreRequest(BACKUP_ROOT_DIR, incrementalBackupid, false,
tables, tables, true));

verifyTable(t1);
assertEquals(NB_ROWS_IN_BATCH, TEST_UTIL.countRows(tableName));
} finally {
conf1.setBoolean(REPLICATION_MARKER_ENABLED_KEY, REPLICATION_MARKER_ENABLED_DEFAULT);
}
}

@Test
public void testContinuousBackupWithIncrementalBackupAndBulkloadSuccess() throws Exception {
public void testIncrementalBackupCopyingBulkloadTillIncrCommittedWalTs() throws Exception {
conf1.setBoolean(REPLICATION_MARKER_ENABLED_KEY, true);
String methodName = Thread.currentThread().getStackTrace()[1].getMethodName();
TableName tableName1 = TableName.valueOf("table_" + methodName);
TEST_UTIL.createTable(tableName1, famName);
try (BackupSystemTable systemTable = new BackupSystemTable(TEST_UTIL.getConnection())) {
// The test starts with some data, and no bulk loaded rows.
int expectedRowCount = NB_ROWS_IN_BATCH;
assertEquals(expectedRowCount, TEST_UTIL.countRows(table1));
assertTrue(systemTable.readBulkloadRows(List.of(table1)).isEmpty());

// Bulk loads aren't tracked if the table isn't backed up yet
performBulkLoad("bulk1", methodName);
expectedRowCount += ROWS_IN_BULK_LOAD;
assertEquals(expectedRowCount, TEST_UTIL.countRows(table1));
assertEquals(0, systemTable.readBulkloadRows(List.of(table1)).size());
// The test starts with no data, and no bulk loaded rows.
int expectedRowCount = 0;
assertEquals(expectedRowCount, TEST_UTIL.countRows(tableName1));
assertTrue(systemTable.readBulkloadRows(List.of(tableName1)).isEmpty());

// Create a backup, bulk loads are now being tracked
String backup1 = backupTables(BackupType.FULL, List.of(table1), BACKUP_ROOT_DIR, true);
// Create continuous backup, bulk loads are now being tracked
String backup1 = backupTables(BackupType.FULL, List.of(tableName1), BACKUP_ROOT_DIR, true);
assertTrue(checkSucceeded(backup1));

loadTable(TEST_UTIL.getConnection().getTable(table1));
assertEquals(expectedRowCount, TEST_UTIL.countRows(table1));
performBulkLoad("bulk2", methodName);
loadTable(TEST_UTIL.getConnection().getTable(tableName1));
expectedRowCount = expectedRowCount + NB_ROWS_IN_BATCH;
performBulkLoad("bulkPreIncr", methodName, tableName1);
expectedRowCount += ROWS_IN_BULK_LOAD;
assertEquals(expectedRowCount, TEST_UTIL.countRows(table1));
assertEquals(1, systemTable.readBulkloadRows(List.of(table1)).size());

// Creating an incremental backup clears the bulk loads
performBulkLoad("bulk4", methodName);
performBulkLoad("bulk5", methodName);
performBulkLoad("bulk6", methodName);
expectedRowCount += 3 * ROWS_IN_BULK_LOAD;
assertEquals(expectedRowCount, TEST_UTIL.countRows(table1));
assertEquals(4, systemTable.readBulkloadRows(List.of(table1)).size());
String backup2 = backupTables(BackupType.INCREMENTAL, List.of(table1), BACKUP_ROOT_DIR, true);
assertEquals(expectedRowCount, TEST_UTIL.countRows(tableName1));
assertEquals(1, systemTable.readBulkloadRows(List.of(tableName1)).size());
loadTable(TEST_UTIL.getConnection().getTable(tableName1));
Thread.sleep(10000);

performBulkLoad("bulkPostIncr", methodName, tableName1);
assertEquals(2, systemTable.readBulkloadRows(List.of(tableName1)).size());

// Incremental backup
String backup2 =
backupTables(BackupType.INCREMENTAL, List.of(tableName1), BACKUP_ROOT_DIR, true);
assertTrue(checkSucceeded(backup2));
assertEquals(expectedRowCount, TEST_UTIL.countRows(table1));
assertEquals(0, systemTable.readBulkloadRows(List.of(table1)).size());
int rowCountAfterBackup2 = expectedRowCount;

// Doing another bulk load, to check that this data will disappear after a restore operation
performBulkLoad("bulk7", methodName);
expectedRowCount += ROWS_IN_BULK_LOAD;
assertEquals(expectedRowCount, TEST_UTIL.countRows(table1));
List<BulkLoad> bulkloadsTemp = systemTable.readBulkloadRows(List.of(table1));
assertEquals(1, bulkloadsTemp.size());
BulkLoad bulk7 = bulkloadsTemp.get(0);

// Doing a restore. Overwriting the table implies clearing the bulk loads,
// but the loading of restored data involves loading bulk data, we expect 2 bulk loads
// associated with backup 3 (loading of full backup, loading of incremental backup).
BackupAdmin client = getBackupAdmin();
client.restore(BackupUtils.createRestoreRequest(BACKUP_ROOT_DIR, backup2, false,
new TableName[] { table1 }, new TableName[] { table1 }, true));
assertEquals(rowCountAfterBackup2, TEST_UTIL.countRows(table1));
List<BulkLoad> bulkLoads = systemTable.readBulkloadRows(List.of(table1));
assertEquals(3, bulkLoads.size());
// bulkPostIncr Bulkload entry should not be deleted post incremental backup
assertEquals(1, systemTable.readBulkloadRows(List.of(tableName1)).size());

TEST_UTIL.truncateTable(tableName1);
// Restore incremental backup
TableName[] tables = new TableName[] { tableName1 };
BackupAdminImpl client = new BackupAdminImpl(TEST_UTIL.getConnection());
client.restore(
BackupUtils.createRestoreRequest(BACKUP_ROOT_DIR, backup2, false, tables, tables, true));
assertEquals(expectedRowCount, TEST_UTIL.countRows(tableName1));
} finally {
conf1.setBoolean(REPLICATION_MARKER_ENABLED_KEY, REPLICATION_MARKER_ENABLED_DEFAULT);
}
}
Expand All @@ -208,7 +193,8 @@ private void verifyTable(Table t1) throws IOException {
assertTrue(CellUtil.matchingQualifier(r.rawCells()[0], COLUMN));
}

private void performBulkLoad(String keyPrefix, String testDir) throws IOException {
private void performBulkLoad(String keyPrefix, String testDir, TableName tableName)
throws IOException {
FileSystem fs = TEST_UTIL.getTestFileSystem();
Path baseDirectory = TEST_UTIL.getDataTestDirOnTestFS(testDir);
Path hfilePath =
Expand All @@ -220,7 +206,7 @@ private void performBulkLoad(String keyPrefix, String testDir) throws IOExceptio
listFiles(fs, baseDirectory, baseDirectory);

Map<BulkLoadHFiles.LoadQueueItem, ByteBuffer> result =
BulkLoadHFiles.create(TEST_UTIL.getConfiguration()).bulkLoad(table1, baseDirectory);
BulkLoadHFiles.create(TEST_UTIL.getConfiguration()).bulkLoad(tableName, baseDirectory);
assertFalse(result.isEmpty());
}

Expand All @@ -246,7 +232,7 @@ private static Set<String> listFiles(final FileSystem fs, final Path root, final
protected static void loadTable(Table table) throws Exception {
Put p; // 100 + 1 row to t1_syncup
for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
p = new Put(Bytes.toBytes("row" + i));
p = new Put(Bytes.toBytes("rowLoad" + i));
p.addColumn(famName, qualName, Bytes.toBytes("val" + i));
table.put(p);
}
Expand Down