From 3becd6cf14f7b62cc07de3f9dcd3229aa2a8d3e4 Mon Sep 17 00:00:00 2001 From: Hemant Kumar Date: Wed, 17 Apr 2024 13:05:51 -0700 Subject: [PATCH] HDDS-9039. Removed the pause and wait in RocksDB compaction when tarball creation is in progress --- .../rocksdiff/RocksDBCheckpointDiffer.java | 39 ------------------ .../ozone/om/TestOMDbCheckpointServlet.java | 28 +------------ .../ozone/om/OMDBCheckpointServlet.java | 41 +++++-------------- 3 files changed, 12 insertions(+), 96 deletions(-) diff --git a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java index 97d015fb2392..cfd252364735 100644 --- a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java +++ b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java @@ -30,7 +30,6 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; import com.google.protobuf.InvalidProtocolBufferException; import org.apache.commons.collections.CollectionUtils; @@ -171,7 +170,6 @@ public class RocksDBCheckpointDiffer implements AutoCloseable, = new BootstrapStateHandler.Lock(); private ColumnFamilyHandle snapshotInfoTableCFHandle; - private final AtomicInteger tarballRequestCount; private static final String DAG_PRUNING_SERVICE_NAME = "CompactionDagPruningService"; private AtomicBoolean suspended; @@ -248,7 +246,6 @@ public class RocksDBCheckpointDiffer implements AutoCloseable, } else { this.scheduler = null; } - this.tarballRequestCount = new AtomicInteger(0); } private String createCompactionLogDir(String metadataDirName, @@ -541,8 +538,6 @@ public void onCompactionCompleted(RocksDB db, return; } - waitForTarballCreation(); - // Add the compaction log entry to Compaction log table. addToCompactionLogTable(compactionLogEntry); @@ -583,22 +578,6 @@ void addToCompactionLogTable(CompactionLogEntry compactionLogEntry) { } } - /** - * Check if there is any in_progress tarball creation request and wait till - * all tarball creation finish, and it gets notified. - */ - private void waitForTarballCreation() { - while (tarballRequestCount.get() != 0) { - try { - wait(Integer.MAX_VALUE); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - LOG.error("Compaction log thread {} is interrupted.", - Thread.currentThread().getName()); - } - } - } - /** * Creates a hard link between provided link and source. * It doesn't throw any exception if {@link Files#createLink} throws @@ -1449,28 +1428,10 @@ public void pruneSstFiles() { } } - public void incrementTarballRequestCount() { - tarballRequestCount.incrementAndGet(); - } - - public void decrementTarballRequestCountAndNotify() { - // Synchronized block is used to ensure that lock is on the same instance notifyAll is being called. - synchronized (this) { - tarballRequestCount.decrementAndGet(); - // Notify compaction threads to continue. - notifyAll(); - } - } - public boolean shouldRun() { return !suspended.get(); } - @VisibleForTesting - public int getTarballRequestCount() { - return tarballRequestCount.get(); - } - @VisibleForTesting public boolean debugEnabled(Integer level) { return DEBUG_LEVEL.contains(level); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMDbCheckpointServlet.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMDbCheckpointServlet.java index 68ed3536a643..f0f4744e8c91 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMDbCheckpointServlet.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMDbCheckpointServlet.java @@ -414,13 +414,8 @@ public void testWriteDbDataToStream() throws Exception { Path expectedLog = Paths.get(compactionLogDir, "expected" + COMPACTION_LOG_FILE_NAME_SUFFIX); String expectedLogStr = truncateFileName(metaDirLength, expectedLog); - Path unExpectedLog = Paths.get(compactionLogDir, "unexpected" + - COMPACTION_LOG_FILE_NAME_SUFFIX); - String unExpectedLogStr = truncateFileName(metaDirLength, unExpectedLog); Path expectedSst = Paths.get(sstBackupDir, "expected.sst"); String expectedSstStr = truncateFileName(metaDirLength, expectedSst); - Path unExpectedSst = Paths.get(sstBackupDir, "unexpected.sst"); - String unExpectedSstStr = truncateFileName(metaDirLength, unExpectedSst); // put "expected" fabricated files onto the fs before the files get // copied to the temp dir. @@ -436,15 +431,6 @@ public void testWriteDbDataToStream() throws Exception { // with the snapshot data. doNothing().when(checkpoint).cleanupCheckpoint(); realCheckpoint.set(checkpoint); - - // put "unexpected" fabricated files onto the fs after the files - // get copied to the temp dir. Since these appear in the "real" - // dir after the copy, they shouldn't exist in the final file - // set. That will show that the copy only happened from the temp dir. - Files.write(unExpectedLog, - "fabricatedData".getBytes(StandardCharsets.UTF_8)); - Files.write(unExpectedSst, - "fabricatedData".getBytes(StandardCharsets.UTF_8)); return checkpoint; }); @@ -460,10 +446,6 @@ public void testWriteDbDataToStream() throws Exception { long tmpHardLinkFileCount = tmpHardLinkFileCount(); omDbCheckpointServletMock.doGet(requestMock, responseMock); assertEquals(tmpHardLinkFileCount, tmpHardLinkFileCount()); - - // Verify that tarball request count reaches to zero once doGet completes. - assertEquals(0, - dbStore.getRocksDBCheckpointDiffer().getTarballRequestCount()); dbCheckpoint = realCheckpoint.get(); // Untar the file into a temp folder to be examined. @@ -528,15 +510,7 @@ public void testWriteDbDataToStream() throws Exception { getFiles(Paths.get(metaDir.toString(), OM_SNAPSHOT_DIR), metaDirLength); assertThat(finalFullSet).contains(expectedLogStr); assertThat(finalFullSet).contains(expectedSstStr); - assertThat(initialFullSet).contains(unExpectedLogStr); - assertThat(initialFullSet).contains(unExpectedSstStr); - - // Remove the dummy files that should not have been copied over - // from the expected data. - initialFullSet.remove(unExpectedLogStr); - initialFullSet.remove(unExpectedSstStr); - assertEquals(initialFullSet, finalFullSet, - "expected snapshot files not found"); + assertEquals(initialFullSet, finalFullSet, "expected snapshot files not found"); } private static long tmpHardLinkFileCount() throws IOException { diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServlet.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServlet.java index cc8acc483406..fddf11dbcfa3 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServlet.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServlet.java @@ -221,43 +221,24 @@ public static Map normalizeExcludeList( } /** - * Pauses rocksdb compaction threads while creating copies of - * compaction logs and hard links of sst backups. + * Copies compaction logs and hard links of sst backups to tmpDir. * @param tmpdir - Place to create copies/links * @param flush - Whether to flush the db or not. * @return Checkpoint containing snapshot entries expected. */ @Override - public DBCheckpoint getCheckpoint(Path tmpdir, boolean flush) - throws IOException { - DBCheckpoint checkpoint; - + public DBCheckpoint getCheckpoint(Path tmpdir, boolean flush) throws IOException { // make tmp directories to contain the copies RocksDBCheckpointDiffer differ = getDbStore().getRocksDBCheckpointDiffer(); - DirectoryData sstBackupDir = new DirectoryData(tmpdir, - differ.getSSTBackupDir()); - DirectoryData compactionLogDir = new DirectoryData(tmpdir, - differ.getCompactionLogDir()); - - long startTime = System.currentTimeMillis(); - long pauseCounter = PAUSE_COUNTER.incrementAndGet(); - - try { - LOG.info("Compaction pausing {} started.", pauseCounter); - // Pause compactions, Copy/link files and get checkpoint. - differ.incrementTarballRequestCount(); - FileUtils.copyDirectory(compactionLogDir.getOriginalDir(), - compactionLogDir.getTmpDir()); - OmSnapshotUtils.linkFiles(sstBackupDir.getOriginalDir(), - sstBackupDir.getTmpDir()); - checkpoint = getDbStore().getCheckpoint(flush); - } finally { - // Unpause the compaction threads. - differ.decrementTarballRequestCountAndNotify(); - long elapsedTime = System.currentTimeMillis() - startTime; - LOG.info("Compaction pausing {} ended. Elapsed ms: {}", pauseCounter, elapsedTime); - } - return checkpoint; + DirectoryData sstBackupDir = new DirectoryData(tmpdir, differ.getSSTBackupDir()); + DirectoryData compactionLogDir = new DirectoryData(tmpdir, differ.getCompactionLogDir()); + + // Create checkpoint and then copy the files so that it has all the compaction entries and files. + DBCheckpoint dbCheckpoint = getDbStore().getCheckpoint(flush); + FileUtils.copyDirectory(compactionLogDir.getOriginalDir(), compactionLogDir.getTmpDir()); + OmSnapshotUtils.linkFiles(sstBackupDir.getOriginalDir(), sstBackupDir.getTmpDir()); + + return dbCheckpoint; }