diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index 3bc86c370c21..462a844f28c7 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -1518,6 +1518,43 @@
+
+ ozone.compaction.service.enabled
+ false
+ OZONE, OM, PERFORMANCE
+
+ Enable or disable a background job that periodically compacts rocksdb tables flagged for compaction.
+
+
+
+ ozone.om.compaction.service.run.interval
+ 6h
+ OZONE, OM, PERFORMANCE
+
+ A background job that periodically compacts rocksdb tables flagged for compaction.
+ Unit could be defined with postfix (ns,ms,s,m,h,d)
+
+
+
+ ozone.om.compaction.service.timeout
+ 10m
+ OZONE, OM, PERFORMANCE
+ A timeout value of compaction service. If this is set
+ greater than 0, the service will stop waiting for compaction
+ completion after this time.
+ Unit could be defined with postfix (ns,ms,s,m,h,d)
+
+
+
+ ozone.om.compaction.service.columnfamilies
+ keyTable,fileTable,directoryTable,deletedTable,deletedDirectoryTable,multipartInfoTable
+ OZONE, OM, PERFORMANCE
+ A comma separated, no spaces list of all the column families
+ that are compacted by the compaction service.
+ If this is empty, no column families are compacted.
+
+
+
ozone.om.snapshot.rocksdb.metrics.enabled
false
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
index 2138db275a04..b45fc26462f0 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
@@ -627,4 +627,22 @@ private OMConfigKeys() {
public static final String OZONE_OM_EDEKCACHELOADER_MAX_RETRIES_KEY =
"ozone.om.edekcacheloader.max-retries";
public static final int OZONE_OM_EDEKCACHELOADER_MAX_RETRIES_DEFAULT = 10;
+
+ /**
+ * Configuration properties for Compaction Service.
+ */
+ public static final String OZONE_OM_COMPACTION_SERVICE_ENABLED = "ozone.compaction.service.enabled";
+ public static final boolean OZONE_OM_COMPACTION_SERVICE_ENABLED_DEFAULT = false;
+ public static final String OZONE_OM_COMPACTION_SERVICE_RUN_INTERVAL =
+ "ozone.om.compaction.service.run.interval";
+ public static final long OZONE_OM_COMPACTION_SERVICE_RUN_INTERVAL_DEFAULT
+ = TimeUnit.HOURS.toMillis(6);
+
+ public static final String OZONE_OM_COMPACTION_SERVICE_TIMEOUT
+ = "ozone.om.compaction.service.timeout";
+ public static final String OZONE_OM_COMPACTION_SERVICE_TIMEOUT_DEFAULT = "10m";
+ public static final String OZONE_OM_COMPACTION_SERVICE_COLUMNFAMILIES
+ = "ozone.om.compaction.service.columnfamilies";
+ public static final String OZONE_OM_COMPACTION_SERVICE_COLUMNFAMILIES_DEFAULT =
+ "keyTable,fileTable,directoryTable,deletedTable,deletedDirectoryTable,multipartInfoTable";
}
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java
index d25535b151d4..c20dd05be3e7 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java
@@ -35,6 +35,7 @@
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadList;
import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadListParts;
+import org.apache.hadoop.ozone.om.service.CompactionService;
import org.apache.hadoop.ozone.om.service.DirectoryDeletingService;
import org.apache.hadoop.ozone.om.service.KeyDeletingService;
import org.apache.hadoop.ozone.om.service.SnapshotDeletingService;
@@ -302,4 +303,10 @@ DeleteKeysResult getPendingDeletionSubFiles(long volumeId,
* @return Background service.
*/
SnapshotDirectoryCleaningService getSnapshotDirectoryService();
+
+ /**
+ * Returns the instance of CompactionService.
+ * @return BackgroundService
+ */
+ CompactionService getCompactionService();
}
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
index f5a984bf402a..26d9b9b21aee 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
@@ -40,6 +40,14 @@
import static org.apache.hadoop.ozone.OzoneConsts.OZONE_URI_DELIMITER;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_DIR_DELETING_SERVICE_INTERVAL;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_DIR_DELETING_SERVICE_INTERVAL_DEFAULT;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_COMPACTION_SERVICE_COLUMNFAMILIES;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_COMPACTION_SERVICE_COLUMNFAMILIES_DEFAULT;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_COMPACTION_SERVICE_ENABLED;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_COMPACTION_SERVICE_ENABLED_DEFAULT;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_COMPACTION_SERVICE_RUN_INTERVAL;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_COMPACTION_SERVICE_RUN_INTERVAL_DEFAULT;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_COMPACTION_SERVICE_TIMEOUT;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_COMPACTION_SERVICE_TIMEOUT_DEFAULT;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_MPU_CLEANUP_SERVICE_INTERVAL;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_MPU_CLEANUP_SERVICE_INTERVAL_DEFAULT;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_MPU_CLEANUP_SERVICE_TIMEOUT;
@@ -80,6 +88,7 @@
import java.security.PrivilegedExceptionAction;
import java.time.Duration;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
@@ -150,6 +159,7 @@
import org.apache.hadoop.ozone.om.request.OMClientRequest;
import org.apache.hadoop.ozone.om.request.file.OMFileRequest;
import org.apache.hadoop.ozone.om.request.util.OMMultipartUploadUtils;
+import org.apache.hadoop.ozone.om.service.CompactionService;
import org.apache.hadoop.ozone.om.service.DirectoryDeletingService;
import org.apache.hadoop.ozone.om.service.KeyDeletingService;
import org.apache.hadoop.ozone.om.service.MultipartUploadCleanupService;
@@ -198,6 +208,7 @@ public class KeyManagerImpl implements KeyManager {
private BackgroundService multipartUploadCleanupService;
private SnapshotDirectoryCleaningService snapshotDirectoryCleaningService;
private DNSToSwitchMapping dnsToSwitchMapping;
+ private CompactionService compactionService;
public KeyManagerImpl(OzoneManager om, ScmClient scmClient,
OzoneConfiguration conf, OMPerformanceMetrics metrics) {
@@ -227,6 +238,10 @@ public KeyManagerImpl(OzoneManager om, ScmClient scmClient,
@Override
public void start(OzoneConfiguration configuration) {
+ boolean isCompactionEnabled = configuration.getBoolean(OZONE_OM_COMPACTION_SERVICE_ENABLED,
+ OZONE_OM_COMPACTION_SERVICE_ENABLED_DEFAULT);
+ startCompactionService(configuration, isCompactionEnabled);
+
boolean isSnapshotDeepCleaningEnabled = configuration.getBoolean(OZONE_SNAPSHOT_DEEP_CLEANING_ENABLED,
OZONE_SNAPSHOT_DEEP_CLEANING_ENABLED_DEFAULT);
if (keyDeletingService == null) {
@@ -363,6 +378,27 @@ public void start(OzoneConfiguration configuration) {
: new CachedDNSToSwitchMapping(newInstance));
}
+ private void startCompactionService(OzoneConfiguration configuration,
+ boolean isCompactionServiceEnabled) {
+ if (compactionService == null && isCompactionServiceEnabled) {
+ long compactionInterval = configuration.getTimeDuration(
+ OZONE_OM_COMPACTION_SERVICE_RUN_INTERVAL,
+ OZONE_OM_COMPACTION_SERVICE_RUN_INTERVAL_DEFAULT,
+ TimeUnit.MILLISECONDS);
+ long serviceTimeout = configuration.getTimeDuration(
+ OZONE_OM_COMPACTION_SERVICE_TIMEOUT,
+ OZONE_OM_COMPACTION_SERVICE_TIMEOUT_DEFAULT,
+ TimeUnit.MILLISECONDS);
+ String compactionColumnFamilies = configuration.get(
+ OZONE_OM_COMPACTION_SERVICE_COLUMNFAMILIES,
+ OZONE_OM_COMPACTION_SERVICE_COLUMNFAMILIES_DEFAULT);
+ String[] tables = compactionColumnFamilies.split(",");
+ compactionService = new CompactionService(ozoneManager, TimeUnit.MILLISECONDS,
+ compactionInterval, serviceTimeout, Arrays.asList(tables));
+ compactionService.start();
+ }
+ }
+
KeyProviderCryptoExtension getKMSProvider() {
return kmsProvider;
}
@@ -397,6 +433,10 @@ public void stop() throws IOException {
snapshotDirectoryCleaningService.shutdown();
snapshotDirectoryCleaningService = null;
}
+ if (compactionService != null) {
+ compactionService.shutdown();
+ compactionService = null;
+ }
}
private OmBucketInfo getBucketInfo(String volumeName, String bucketName)
@@ -807,6 +847,11 @@ public SnapshotDirectoryCleaningService getSnapshotDirectoryService() {
return snapshotDirectoryCleaningService;
}
+ @Override
+ public CompactionService getCompactionService() {
+ return compactionService;
+ }
+
public boolean isSstFilteringSvcEnabled() {
long serviceInterval = ozoneManager.getConfiguration()
.getTimeDuration(OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL,
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/CompactionService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/CompactionService.java
new file mode 100644
index 000000000000..dfcc32578adf
--- /dev/null
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/CompactionService.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.service;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.hadoop.hdds.utils.BackgroundService;
+import org.apache.hadoop.hdds.utils.BackgroundTask;
+import org.apache.hadoop.hdds.utils.BackgroundTaskQueue;
+import org.apache.hadoop.hdds.utils.BackgroundTaskResult;
+import org.apache.hadoop.hdds.utils.db.RDBStore;
+import org.apache.hadoop.hdds.utils.db.RocksDatabase;
+import org.apache.hadoop.hdds.utils.db.managed.ManagedCompactRangeOptions;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This is the background service to compact OM rocksdb tables.
+ */
+public class CompactionService extends BackgroundService {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(CompactionService.class);
+
+ // Use only a single thread for Compaction.
+ private static final int COMPACTOR_THREAD_POOL_SIZE = 1;
+
+ private final OzoneManager ozoneManager;
+ private final OMMetadataManager omMetadataManager;
+ private final AtomicLong numCompactions;
+ private final AtomicBoolean suspended;
+ // list of tables that can be compacted
+ private final List compactableTables;
+
+ public CompactionService(OzoneManager ozoneManager, TimeUnit unit, long interval, long timeout,
+ List tables) {
+ super("CompactionService", interval, unit,
+ COMPACTOR_THREAD_POOL_SIZE, timeout,
+ ozoneManager.getThreadNamePrefix());
+ this.ozoneManager = ozoneManager;
+ this.omMetadataManager = this.ozoneManager.getMetadataManager();
+
+ this.numCompactions = new AtomicLong(0);
+ this.suspended = new AtomicBoolean(false);
+ this.compactableTables = validateTables(tables);
+ }
+
+ private List validateTables(List tables) {
+ if (tables == null || tables.isEmpty()) {
+ return Collections.emptyList();
+ }
+ List validTables = new ArrayList<>();
+ Set allTableNames = new HashSet<>(omMetadataManager.listTableNames());
+ for (String table : tables) {
+ if (allTableNames.contains(table)) {
+ validTables.add(table);
+ } else {
+ LOG.warn("CompactionService: Table \"{}\" not found in OM metadata. Skipping this table.", table);
+ }
+ }
+ if (validTables.isEmpty()) {
+ LOG.error("CompactionService: No valid compaction tables found. Failing initialization.");
+ throw new IllegalArgumentException("CompactionService: None of the provided tables are valid.");
+ }
+ return Collections.unmodifiableList(validTables);
+ }
+
+ /**
+ * Suspend the service (for testing).
+ */
+ @VisibleForTesting
+ public void suspend() {
+ suspended.set(true);
+ }
+
+ /**
+ * Resume the service if suspended (for testing).
+ */
+ @VisibleForTesting
+ public void resume() {
+ suspended.set(false);
+ }
+
+ @VisibleForTesting
+ public List getCompactableTables() {
+ return compactableTables;
+ }
+
+ /**
+ * Returns the number of manual compactions performed.
+ *
+ * @return long count.
+ */
+ @VisibleForTesting
+ public long getNumCompactions() {
+ return numCompactions.get();
+ }
+
+ @Override
+ public synchronized BackgroundTaskQueue getTasks() {
+ BackgroundTaskQueue queue = new BackgroundTaskQueue();
+ for (String tableName : compactableTables) {
+ queue.add(new CompactTask(tableName));
+ }
+ return queue;
+ }
+
+ private boolean shouldRun() {
+ return !suspended.get();
+ }
+
+ protected void compactFully(String tableName) throws IOException {
+ long startTime = Time.monotonicNow();
+ LOG.info("Compacting column family: {}", tableName);
+ try (ManagedCompactRangeOptions options = new ManagedCompactRangeOptions()) {
+ options.setBottommostLevelCompaction(ManagedCompactRangeOptions.BottommostLevelCompaction.kForce);
+ options.setExclusiveManualCompaction(true);
+ RocksDatabase rocksDatabase = ((RDBStore) omMetadataManager.getStore()).getDb();
+
+ try {
+ // Find CF Handler
+ RocksDatabase.ColumnFamily columnFamily = rocksDatabase.getColumnFamily(tableName);
+ rocksDatabase.compactRange(columnFamily, null, null, options);
+ LOG.info("Compaction of column family: {} completed in {} ms",
+ tableName, Time.monotonicNow() - startTime);
+ } catch (NullPointerException ex) {
+ LOG.error("Unable to trigger compaction for \"{}\". Column family not found ", tableName);
+ throw new IOException("Column family \"" + tableName + "\" not found.");
+ }
+ }
+ }
+
+ private class CompactTask implements BackgroundTask {
+ private final String tableName;
+
+ CompactTask(String tableName) {
+ this.tableName = tableName;
+ }
+
+ @Override
+ public int getPriority() {
+ return 0;
+ }
+
+ @Override
+ public BackgroundTaskResult call() throws Exception {
+ // trigger full compaction for the specified table.
+ if (!shouldRun()) {
+ return BackgroundTaskResult.EmptyTaskResult.newResult();
+ }
+ LOG.debug("Running CompactTask");
+
+ compactFully(tableName);
+ numCompactions.incrementAndGet();
+ return () -> 1;
+ }
+ }
+
+}
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestCompactionService.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestCompactionService.java
new file mode 100644
index 000000000000..32b41c9ff120
--- /dev/null
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestCompactionService.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.service;
+
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_COMPACTION_SERVICE_ENABLED;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_COMPACTION_SERVICE_RUN_INTERVAL;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.server.ServerUtils;
+import org.apache.hadoop.hdds.utils.db.DBConfigFromFile;
+import org.apache.hadoop.hdds.utils.db.TypedTable;
+import org.apache.hadoop.ozone.om.OMConfigKeys;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.ozone.test.GenericTestUtils;
+import org.apache.ratis.util.ExitUtils;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.io.TempDir;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+class TestCompactionService {
+ private static final Logger LOG = LoggerFactory.getLogger(TestCompactionService.class);
+
+ private static final int SERVICE_INTERVAL = 1;
+ private static final int WAIT_TIME = (int) Duration.ofSeconds(10).toMillis();
+ private OzoneManager ozoneManager;
+ private OMMetadataManager metadataManager;
+
+ @BeforeAll
+ void setup(@TempDir Path tempDir) {
+ ExitUtils.disableSystemExit();
+
+ OzoneConfiguration conf = new OzoneConfiguration();
+ System.setProperty(DBConfigFromFile.CONFIG_DIR, "/");
+ ServerUtils.setOzoneMetaDirPath(conf, tempDir.toString());
+ conf.setBoolean(OZONE_OM_COMPACTION_SERVICE_ENABLED, true);
+ conf.setTimeDuration(OZONE_OM_COMPACTION_SERVICE_RUN_INTERVAL,
+ SERVICE_INTERVAL, TimeUnit.MILLISECONDS);
+ conf.setQuietMode(false);
+
+ ozoneManager = mock(OzoneManager.class);
+ metadataManager = mock(OMMetadataManager.class);
+ when(ozoneManager.getMetadataManager()).thenReturn(metadataManager);
+ TypedTable table = mock(TypedTable.class);
+
+ Set tables = new HashSet<>();
+ tables.add("keyTable");
+ tables.add("fileTable");
+ tables.add("directoryTable");
+ tables.add("deletedTable");
+ tables.add("deletedDirectoryTable");
+ tables.add("multipartInfoTable");
+ when(metadataManager.getTable(anyString())).thenReturn(table);
+ when(metadataManager.listTableNames()).thenReturn(tables);
+ }
+
+ /**
+ * Add a compaction request and verify that it is processed.
+ *
+ * @throws IOException - on Failure.
+ */
+ @Timeout(300)
+ @Test
+ public void testCompactSuccessfully() throws Exception {
+
+ CompactionService compactionService = getCompactionService(Arrays.asList(
+ OMConfigKeys.OZONE_OM_COMPACTION_SERVICE_COLUMNFAMILIES_DEFAULT.split(",")));
+ compactionService.start();
+
+ compactionService.suspend();
+ // wait for submitted tasks to complete
+ Thread.sleep(SERVICE_INTERVAL);
+ final long oldkeyCount = compactionService.getNumCompactions();
+ LOG.info("oldkeyCount={}", oldkeyCount);
+
+ final int compactionTriggered = 1;
+
+ compactionService.resume();
+
+ GenericTestUtils.waitFor(
+ () -> compactionService.getNumCompactions() >= oldkeyCount + compactionTriggered,
+ SERVICE_INTERVAL, WAIT_TIME);
+ }
+
+ @Timeout(300)
+ @Test
+ public void testCompactSkipInvalidTable() throws Exception {
+
+ List compactTables = new ArrayList<>();
+ compactTables.add("fileTable");
+ compactTables.add("keyTable");
+ compactTables.add("invalidTable");
+
+
+ CompactionService compactionService = getCompactionService(compactTables);
+
+ // compaction should start, but with only the valid tables
+ compactionService.start();
+ compactionService.suspend();
+ // wait for submitted tasks to complete
+ Thread.sleep(SERVICE_INTERVAL);
+
+ assertTrue(compactionService.getCompactableTables().contains("fileTable"));
+ assertTrue(compactionService.getCompactableTables().contains("keyTable"));
+ assertFalse(compactionService.getCompactableTables().contains("invalidTable"));
+
+ final long oldkeyCount = compactionService.getNumCompactions();
+ LOG.info("oldkeyCount={}", oldkeyCount);
+
+ final int compactionTriggered = 1;
+
+ compactionService.resume();
+
+ GenericTestUtils.waitFor(
+ () -> compactionService.getNumCompactions() >= oldkeyCount + compactionTriggered,
+ SERVICE_INTERVAL, WAIT_TIME);
+ }
+
+ @Timeout(300)
+ @Test
+ public void testCompactFailure() {
+
+ List compactTables = new ArrayList<>();
+ compactTables.add("invalidTable2");
+ compactTables.add("invalidTable1");
+
+ // initialization should fail if all tables are invalid
+ assertThrows(IllegalArgumentException.class,
+ () -> getCompactionService(compactTables));
+ }
+
+ private CompactionService getCompactionService(List compactTables) {
+ CompactionService compactionService = new CompactionService(ozoneManager, TimeUnit.MILLISECONDS,
+ TimeUnit.SECONDS.toMillis(SERVICE_INTERVAL), TimeUnit.SECONDS.toMillis(60), compactTables) {
+
+ @Override
+ public void compactFully(String tableName) throws IOException {
+ LOG.info("Compacting column family: {}", tableName);
+ }
+ };
+ return compactionService;
+ }
+}