diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/ReconContainerMetadataManager.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/ReconContainerMetadataManager.java index 286c1596fa46..5353a8815cc8 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/ReconContainerMetadataManager.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/ReconContainerMetadataManager.java @@ -22,6 +22,7 @@ import java.util.UUID; import org.apache.hadoop.hdds.annotation.InterfaceStability; import org.apache.hadoop.hdds.utils.db.BatchOperation; +import org.apache.hadoop.hdds.utils.db.DBStore; import org.apache.hadoop.hdds.utils.db.RDBBatchOperation; import org.apache.hadoop.hdds.utils.db.Table; import org.apache.hadoop.hdds.utils.db.TableIterator; @@ -29,6 +30,7 @@ import org.apache.hadoop.ozone.recon.api.types.ContainerMetadata; import org.apache.hadoop.ozone.recon.api.types.KeyPrefixContainer; import org.apache.hadoop.ozone.recon.scm.ContainerReplicaHistory; +import org.apache.hadoop.ozone.recon.spi.impl.ReconDBProvider; import org.apache.hadoop.ozone.util.SeekableIterator; /** @@ -57,6 +59,21 @@ void reinitWithNewContainerDataFromOm(Map void storeContainerKeyMapping(ContainerKeyPrefix containerKeyPrefix, Integer count) throws IOException; + /** + * Returns staged DB container metadata manager. + * + * @param stagedReconDbStore staged Recon DB store + * @return ReconContainerMetadataManager + */ + ReconContainerMetadataManager getStagedReconContainerMetadataManager(DBStore stagedReconDbStore); + + /** + * reinitialize the ReconContainerMetadataManage. + * + * @param reconDBProvider recon DB provider to reinitialize with. + */ + void reinitialize(ReconDBProvider reconDBProvider); + /** * Store the container to Key prefix mapping into a batch. * diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/ReconNamespaceSummaryManager.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/ReconNamespaceSummaryManager.java index 5c6d808fc8ad..e166466cd568 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/ReconNamespaceSummaryManager.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/ReconNamespaceSummaryManager.java @@ -20,9 +20,11 @@ import java.io.IOException; import org.apache.hadoop.hdds.annotation.InterfaceStability; import org.apache.hadoop.hdds.utils.db.BatchOperation; +import org.apache.hadoop.hdds.utils.db.DBStore; import org.apache.hadoop.hdds.utils.db.RDBBatchOperation; import org.apache.hadoop.ozone.om.OMMetadataManager; import org.apache.hadoop.ozone.recon.api.types.NSSummary; +import org.apache.hadoop.ozone.recon.spi.impl.ReconDBProvider; /** * Interface for DB operations on NSSummary. @@ -30,6 +32,10 @@ @InterfaceStability.Unstable public interface ReconNamespaceSummaryManager { + ReconNamespaceSummaryManager getStagedNsSummaryManager(DBStore dbStore) throws IOException; + + void reinitialize(ReconDBProvider reconDBProvider) throws IOException; + void clearNSSummaryTable() throws IOException; @Deprecated diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/ReconContainerMetadataManagerImpl.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/ReconContainerMetadataManagerImpl.java index ec814bc7e040..91a80ee40e36 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/ReconContainerMetadataManagerImpl.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/ReconContainerMetadataManagerImpl.java @@ -83,17 +83,34 @@ public class ReconContainerMetadataManagerImpl private DBStore containerDbStore; - @Inject private Configuration sqlConfiguration; - @Inject private ReconOMMetadataManager omMetadataManager; @Inject - public ReconContainerMetadataManagerImpl(ReconDBProvider reconDBProvider, - Configuration sqlConfiguration) { - containerDbStore = reconDBProvider.getDbStore(); + public ReconContainerMetadataManagerImpl( + ReconDBProvider reconDBProvider, Configuration sqlConfiguration, ReconOMMetadataManager omMetadataManager) { + this(reconDBProvider.getDbStore(), sqlConfiguration, omMetadataManager); + } + + private ReconContainerMetadataManagerImpl( + DBStore reconDBStore, Configuration sqlConfiguration, ReconOMMetadataManager omMetadataManager) { + containerDbStore = reconDBStore; globalStatsDao = new GlobalStatsDao(sqlConfiguration); + this.sqlConfiguration = sqlConfiguration; + this.omMetadataManager = omMetadataManager; + initializeTables(); + } + + @Override + public ReconContainerMetadataManager getStagedReconContainerMetadataManager( + DBStore stagedReconDbStore) { + return new ReconContainerMetadataManagerImpl(stagedReconDbStore, sqlConfiguration, omMetadataManager); + } + + @Override + public void reinitialize(ReconDBProvider reconDBProvider) { + containerDbStore = reconDBProvider.getDbStore(); initializeTables(); } diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/ReconDBProvider.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/ReconDBProvider.java index e1f7dd21ea4b..ab84e990634e 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/ReconDBProvider.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/ReconDBProvider.java @@ -24,7 +24,10 @@ import com.google.inject.ProvisionException; import java.io.File; import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.StandardCopyOption; import javax.inject.Inject; +import org.apache.commons.io.FileUtils; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.utils.db.DBStore; import org.apache.hadoop.hdds.utils.db.DBStoreBuilder; @@ -39,6 +42,8 @@ * Provider for Recon's RDB. */ public class ReconDBProvider { + private static final String STAGED_EXT = ".staged"; + private static final String BACKUP_EXT = ".backup"; private OzoneConfiguration configuration; private ReconUtils reconUtils; private DBStore dbStore; @@ -54,20 +59,47 @@ public class ReconDBProvider { this.dbStore = provideReconDB(); } + private ReconDBProvider(OzoneConfiguration configuration, ReconUtils reconUtils, DBStore dbStore) { + this.configuration = configuration; + this.reconUtils = reconUtils; + this.dbStore = dbStore; + } + + public ReconDBProvider getStagedReconDBProvider() throws IOException { + File reconDbDir = reconUtils.getReconDbDir(configuration, OZONE_RECON_DB_DIR); + String stagedDbName = RECON_CONTAINER_KEY_DB + STAGED_EXT; + FileUtils.deleteDirectory(new File(reconDbDir, stagedDbName)); + DBStore db = initializeDBStore(configuration, stagedDbName); + if (db == null) { + throw new ProvisionException("Unable to initialize staged recon container DBStore"); + } + return new ReconDBProvider(configuration, reconUtils, db); + } + public DBStore provideReconDB() { DBStore db; - File reconDbDir = - reconUtils.getReconDbDir(configuration, OZONE_RECON_DB_DIR); - File lastKnownContainerKeyDb = - reconUtils.getLastKnownDB(reconDbDir, RECON_CONTAINER_KEY_DB); - if (lastKnownContainerKeyDb != null) { - LOG.info("Last known Recon DB : {}", - lastKnownContainerKeyDb.getAbsolutePath()); - db = initializeDBStore(configuration, - lastKnownContainerKeyDb.getName()); - } else { - db = getNewDBStore(configuration); + try { + // handle recover of last known container as old format removing timestamp + File reconDbDir = reconUtils.getReconDbDir(configuration, OZONE_RECON_DB_DIR); + File lastKnownContainerKeyDb = reconUtils.getLastKnownDB(reconDbDir, RECON_CONTAINER_KEY_DB); + if (lastKnownContainerKeyDb != null) { + LOG.info("Last known Recon DB : {}", lastKnownContainerKeyDb.getAbsolutePath()); + Files.move(lastKnownContainerKeyDb.toPath(), new File(reconDbDir, RECON_CONTAINER_KEY_DB).toPath(), + StandardCopyOption.ATOMIC_MOVE, StandardCopyOption.REPLACE_EXISTING); + } + // if db does not exist, check if a backup exists and restore it. This can happen when replace with + // staged db fails and backup is not restored at that point of time. + if (!new File(reconDbDir, RECON_CONTAINER_KEY_DB).exists() && + new File(reconDbDir, RECON_CONTAINER_KEY_DB + BACKUP_EXT).exists()) { + LOG.info("Recon DB backup found, restoring from backup."); + Files.move(new File(reconDbDir, RECON_CONTAINER_KEY_DB + BACKUP_EXT).toPath(), + new File(reconDbDir, RECON_CONTAINER_KEY_DB).toPath(), + StandardCopyOption.ATOMIC_MOVE, StandardCopyOption.REPLACE_EXISTING); + } + } catch (IOException e) { + throw new ProvisionException("Unable to recover container DB path.", e); } + db = initializeDBStore(configuration, RECON_CONTAINER_KEY_DB); if (db == null) { throw new ProvisionException("Unable to provide instance of DBStore " + "store."); @@ -92,11 +124,6 @@ static void truncateTable(Table table) throws IOException { } } - static DBStore getNewDBStore(OzoneConfiguration configuration) { - String dbName = RECON_CONTAINER_KEY_DB + "_" + System.currentTimeMillis(); - return initializeDBStore(configuration, dbName); - } - private static DBStore initializeDBStore(OzoneConfiguration configuration, String dbName) { DBStore dbStore = null; @@ -115,4 +142,29 @@ public void close() throws Exception { dbStore = null; } } + + public void replaceStagedDb(ReconDBProvider stagedReconDBProvider) throws Exception { + File dbPath = dbStore.getDbLocation(); + File stagedDbPath = stagedReconDBProvider.getDbStore().getDbLocation(); + File backupPath = new File(dbPath.getAbsolutePath() + BACKUP_EXT); + stagedReconDBProvider.close(); + try { + FileUtils.deleteDirectory(backupPath); + close(); + Files.move(dbPath.toPath(), backupPath.toPath(), StandardCopyOption.ATOMIC_MOVE, + StandardCopyOption.REPLACE_EXISTING); + Files.move(stagedDbPath.toPath(), dbPath.toPath(), StandardCopyOption.ATOMIC_MOVE, + StandardCopyOption.REPLACE_EXISTING); + dbStore = initializeDBStore(configuration, dbPath.getName()); + } catch (Exception e) { + if (dbStore == null) { + Files.move(dbPath.toPath(), stagedDbPath.toPath(), StandardCopyOption.ATOMIC_MOVE, + StandardCopyOption.REPLACE_EXISTING); + Files.move(backupPath.toPath(), dbPath.toPath(), StandardCopyOption.ATOMIC_MOVE, + StandardCopyOption.REPLACE_EXISTING); + dbStore = initializeDBStore(configuration, dbPath.getName()); + } + throw e; + } + } } diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/ReconNamespaceSummaryManagerImpl.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/ReconNamespaceSummaryManagerImpl.java index 434d43fde3e3..536fce1e8fe1 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/ReconNamespaceSummaryManagerImpl.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/ReconNamespaceSummaryManagerImpl.java @@ -44,11 +44,27 @@ public class ReconNamespaceSummaryManagerImpl @Inject public ReconNamespaceSummaryManagerImpl(ReconDBProvider reconDBProvider, NSSummaryTask nsSummaryTask) throws IOException { - namespaceDbStore = reconDBProvider.getDbStore(); + this(reconDBProvider.getDbStore(), nsSummaryTask); + } + + private ReconNamespaceSummaryManagerImpl(DBStore dbStore, NSSummaryTask nsSummaryTask) + throws IOException { + namespaceDbStore = dbStore; this.nsSummaryTable = NAMESPACE_SUMMARY.getTable(namespaceDbStore); this.nsSummaryTask = nsSummaryTask; } + @Override + public ReconNamespaceSummaryManager getStagedNsSummaryManager(DBStore dbStore) throws IOException { + return new ReconNamespaceSummaryManagerImpl(dbStore, nsSummaryTask); + } + + @Override + public void reinitialize(ReconDBProvider reconDBProvider) throws IOException { + namespaceDbStore = reconDBProvider.getDbStore(); + this.nsSummaryTable = NAMESPACE_SUMMARY.getTable(namespaceDbStore); + } + @Override public void clearNSSummaryTable() throws IOException { truncateTable(nsSummaryTable); diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperTaskFSO.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperTaskFSO.java index fd8df1a1e4f4..60499ac33c8a 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperTaskFSO.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperTaskFSO.java @@ -18,11 +18,14 @@ package org.apache.hadoop.ozone.recon.tasks; import com.google.inject.Inject; +import java.io.IOException; import java.util.Map; import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.utils.db.DBStore; import org.apache.hadoop.ozone.om.OMMetadataManager; import org.apache.hadoop.ozone.om.helpers.BucketLayout; import org.apache.hadoop.ozone.recon.ReconServerConfigKeys; +import org.apache.hadoop.ozone.recon.recovery.ReconOMMetadataManager; import org.apache.hadoop.ozone.recon.spi.ReconContainerMetadataManager; /** @@ -40,6 +43,13 @@ public ContainerKeyMapperTaskFSO(ReconContainerMetadataManager reconContainerMet this.ozoneConfiguration = configuration; } + @Override + public ReconOmTask getStagedTask(ReconOMMetadataManager stagedOmMetadataManager, DBStore stagedReconDbStore) + throws IOException { + return new ContainerKeyMapperTaskFSO( + reconContainerMetadataManager.getStagedReconContainerMetadataManager(stagedReconDbStore), ozoneConfiguration); + } + @Override public TaskResult reprocess(OMMetadataManager omMetadataManager) { long containerKeyFlushToDBMaxThreshold = ozoneConfiguration.getLong( diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperTaskOBS.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperTaskOBS.java index 178ee8b02867..e69601040d60 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperTaskOBS.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperTaskOBS.java @@ -18,11 +18,14 @@ package org.apache.hadoop.ozone.recon.tasks; import com.google.inject.Inject; +import java.io.IOException; import java.util.Map; import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.utils.db.DBStore; import org.apache.hadoop.ozone.om.OMMetadataManager; import org.apache.hadoop.ozone.om.helpers.BucketLayout; import org.apache.hadoop.ozone.recon.ReconServerConfigKeys; +import org.apache.hadoop.ozone.recon.recovery.ReconOMMetadataManager; import org.apache.hadoop.ozone.recon.spi.ReconContainerMetadataManager; /** @@ -40,6 +43,13 @@ public ContainerKeyMapperTaskOBS(ReconContainerMetadataManager reconContainerMet this.ozoneConfiguration = configuration; } + @Override + public ReconOmTask getStagedTask(ReconOMMetadataManager stagedOmMetadataManager, DBStore stagedReconDbStore) + throws IOException { + return new ContainerKeyMapperTaskOBS( + reconContainerMetadataManager.getStagedReconContainerMetadataManager(stagedReconDbStore), ozoneConfiguration); + } + @Override public TaskResult reprocess(OMMetadataManager omMetadataManager) { long containerKeyFlushToDBMaxThreshold = ozoneConfiguration.getLong( diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/NSSummaryTask.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/NSSummaryTask.java index ed21821492cb..1291a6cc9e7c 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/NSSummaryTask.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/NSSummaryTask.java @@ -39,6 +39,7 @@ import javax.inject.Inject; import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.utils.db.DBStore; import org.apache.hadoop.ozone.om.OMMetadataManager; import org.apache.hadoop.ozone.recon.recovery.ReconOMMetadataManager; import org.apache.hadoop.ozone.recon.spi.ReconNamespaceSummaryManager; @@ -76,6 +77,7 @@ public class NSSummaryTask implements ReconOmTask { private final ReconNamespaceSummaryManager reconNamespaceSummaryManager; private final ReconOMMetadataManager reconOMMetadataManager; + private final OzoneConfiguration ozoneConfiguration; private final NSSummaryTaskWithFSO nsSummaryTaskWithFSO; private final NSSummaryTaskWithLegacy nsSummaryTaskWithLegacy; private final NSSummaryTaskWithOBS nsSummaryTaskWithOBS; @@ -98,6 +100,7 @@ public NSSummaryTask(ReconNamespaceSummaryManager ozoneConfiguration) { this.reconNamespaceSummaryManager = reconNamespaceSummaryManager; this.reconOMMetadataManager = reconOMMetadataManager; + this.ozoneConfiguration = ozoneConfiguration; long nsSummaryFlushToDBMaxThreshold = ozoneConfiguration.getLong( OZONE_RECON_NSSUMMARY_FLUSH_TO_DB_MAX_THRESHOLD, OZONE_RECON_NSSUMMARY_FLUSH_TO_DB_MAX_THRESHOLD_DEFAULT); @@ -112,6 +115,14 @@ public NSSummaryTask(ReconNamespaceSummaryManager reconNamespaceSummaryManager, reconOMMetadataManager, nsSummaryFlushToDBMaxThreshold); } + @Override + public NSSummaryTask getStagedTask(ReconOMMetadataManager stagedOmMetadataManager, DBStore stagedReconDbStore) + throws IOException { + ReconNamespaceSummaryManager stagedNsSummaryManager = + reconNamespaceSummaryManager.getStagedNsSummaryManager(stagedReconDbStore); + return new NSSummaryTask(stagedNsSummaryManager, stagedOmMetadataManager, ozoneConfiguration); + } + @Override public String getTaskName() { return "NSSummaryTask"; diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconOmTask.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconOmTask.java index 395fdf6b1e00..ccf5b948a8bd 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconOmTask.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconOmTask.java @@ -17,9 +17,12 @@ package org.apache.hadoop.ozone.recon.tasks; +import java.io.IOException; import java.util.Collections; import java.util.Map; +import org.apache.hadoop.hdds.utils.db.DBStore; import org.apache.hadoop.ozone.om.OMMetadataManager; +import org.apache.hadoop.ozone.recon.recovery.ReconOMMetadataManager; /** * Interface used to denote a Recon task that needs to act on OM DB events. @@ -62,6 +65,18 @@ TaskResult process(OMUpdateEventBatch events, */ TaskResult reprocess(OMMetadataManager omMetadataManager); + /** + * Returns a staged task that can be used to reprocess events. + * @param stagedOmMetadataManager om metadata manager for staged OM DB + * @param stagedReconDbStore recon DB store for staged + * @return task that can be used to reprocess events + * @throws IOException exception + */ + default ReconOmTask getStagedTask(ReconOMMetadataManager stagedOmMetadataManager, DBStore stagedReconDbStore) + throws IOException { + return this; + } + /** * Represents the result of a task execution, including the task name, * sub-task seek positions, and success status. diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconTaskControllerImpl.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconTaskControllerImpl.java index 966ee95f0b3d..dbdd781e290b 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconTaskControllerImpl.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconTaskControllerImpl.java @@ -22,6 +22,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.inject.Inject; +import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -35,12 +36,16 @@ import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.ozone.om.OMMetadataManager; import org.apache.hadoop.ozone.recon.ReconConstants; import org.apache.hadoop.ozone.recon.recovery.ReconOMMetadataManager; +import org.apache.hadoop.ozone.recon.spi.ReconContainerMetadataManager; +import org.apache.hadoop.ozone.recon.spi.ReconNamespaceSummaryManager; +import org.apache.hadoop.ozone.recon.spi.impl.ReconDBProvider; import org.apache.hadoop.ozone.recon.tasks.types.NamedCallableTask; import org.apache.hadoop.ozone.recon.tasks.types.TaskExecutionException; import org.apache.hadoop.ozone.recon.tasks.updater.ReconTaskStatusUpdater; @@ -55,6 +60,10 @@ public class ReconTaskControllerImpl implements ReconTaskController { private static final Logger LOG = LoggerFactory.getLogger(ReconTaskControllerImpl.class); + private static final String REPROCESS_STAGING = "REPROCESS_STAGING"; + private final ReconDBProvider reconDBProvider; + private final ReconContainerMetadataManager reconContainerMetadataManager; + private final ReconNamespaceSummaryManager reconNamespaceSummaryManager; private Map reconOmTasks; private ExecutorService executorService; @@ -66,7 +75,13 @@ public class ReconTaskControllerImpl implements ReconTaskController { @Inject public ReconTaskControllerImpl(OzoneConfiguration configuration, Set tasks, - ReconTaskStatusUpdaterManager taskStatusUpdaterManager) { + ReconTaskStatusUpdaterManager taskStatusUpdaterManager, + ReconDBProvider reconDBProvider, + ReconContainerMetadataManager reconContainerMetadataManager, + ReconNamespaceSummaryManager reconNamespaceSummaryManager) { + this.reconDBProvider = reconDBProvider; + this.reconContainerMetadataManager = reconContainerMetadataManager; + this.reconNamespaceSummaryManager = reconNamespaceSummaryManager; reconOmTasks = new HashMap<>(); threadCount = configuration.getInt(OZONE_RECON_TASK_THREAD_COUNT_KEY, OZONE_RECON_TASK_THREAD_COUNT_DEFAULT); @@ -171,6 +186,7 @@ private void ignoreFailedTasks(List failedTasks) { @Override public synchronized void reInitializeTasks(ReconOMMetadataManager omMetadataManager, Map reconOmTaskMap) { + LOG.info("Starting Re-initialization of tasks."); Collection> tasks = new ArrayList<>(); Map localReconOmTaskMap = reconOmTaskMap; if (reconOmTaskMap == null) { @@ -178,12 +194,26 @@ public synchronized void reInitializeTasks(ReconOMMetadataManager omMetadataMana } ReconConstants.resetTableTruncatedFlags(); + ReconDBProvider stagedReconDBProvider; + try { + ReconTaskStatusUpdater reprocessTaskStatus = taskStatusUpdaterManager.getTaskStatusUpdater(REPROCESS_STAGING); + reprocessTaskStatus.recordRunStart(); + stagedReconDBProvider = reconDBProvider.getStagedReconDBProvider(); + } catch (IOException e) { + LOG.error("Failed to get staged Recon DB provider for reinitialization of tasks.", e); + recordAllTaskStatus(localReconOmTaskMap, -1, -1); + return; + } + localReconOmTaskMap.values().forEach(task -> { ReconTaskStatusUpdater taskStatusUpdater = taskStatusUpdaterManager.getTaskStatusUpdater(task.getTaskName()); taskStatusUpdater.recordRunStart(); - tasks.add(new NamedCallableTask<>(task.getTaskName(), () -> task.reprocess(omMetadataManager))); + tasks.add(new NamedCallableTask<>(task.getTaskName(), + () -> task.getStagedTask(omMetadataManager, stagedReconDBProvider.getDbStore()) + .reprocess(omMetadataManager))); }); + AtomicBoolean isRunSuccessful = new AtomicBoolean(true); try { CompletableFuture.allOf(tasks.stream() .map(task -> CompletableFuture.supplyAsync(() -> { @@ -197,36 +227,74 @@ public synchronized void reInitializeTasks(ReconOMMetadataManager omMetadataMana throw new TaskExecutionException(task.getTaskName(), e); } }, executorService).thenAccept(result -> { - String taskName = result.getTaskName(); - ReconTaskStatusUpdater taskStatusUpdater = - taskStatusUpdaterManager.getTaskStatusUpdater(taskName); if (!result.isTaskSuccess()) { + String taskName = result.getTaskName(); LOG.error("Init failed for task {}.", taskName); - taskStatusUpdater.setLastTaskRunStatus(-1); - } else { - taskStatusUpdater.setLastTaskRunStatus(0); - taskStatusUpdater.setLastUpdatedSeqNumber( - omMetadataManager.getLastSequenceNumberFromDB()); + isRunSuccessful.set(false); } - taskStatusUpdater.recordRunCompletion(); }).exceptionally(ex -> { LOG.error("Task failed with exception: ", ex); + isRunSuccessful.set(false); if (ex.getCause() instanceof TaskExecutionException) { TaskExecutionException taskEx = (TaskExecutionException) ex.getCause(); String taskName = taskEx.getTaskName(); LOG.error("The above error occurred while trying to execute task: {}", taskName); - - ReconTaskStatusUpdater taskStatusUpdater = taskStatusUpdaterManager.getTaskStatusUpdater(taskName); - taskStatusUpdater.setLastTaskRunStatus(-1); - taskStatusUpdater.recordRunCompletion(); } return null; })).toArray(CompletableFuture[]::new)).join(); } catch (CompletionException ce) { LOG.error("Completing all tasks failed with exception ", ce); + isRunSuccessful.set(false); } catch (CancellationException ce) { LOG.error("Some tasks were cancelled with exception", ce); + isRunSuccessful.set(false); + } + + if (isRunSuccessful.get()) { + try { + reconDBProvider.replaceStagedDb(stagedReconDBProvider); + reconNamespaceSummaryManager.reinitialize(reconDBProvider); + reconContainerMetadataManager.reinitialize(reconDBProvider); + recordAllTaskStatus(localReconOmTaskMap, 0, omMetadataManager.getLastSequenceNumberFromDB()); + LOG.info("Re-initialization of tasks completed successfully."); + } catch (Exception e) { + LOG.error("Re-initialization of tasks failed.", e); + recordAllTaskStatus(localReconOmTaskMap, -1, -1); + // reinitialize the Recon OM tasks with the original DB provider + try { + reconNamespaceSummaryManager.reinitialize(reconDBProvider); + reconContainerMetadataManager.reinitialize(reconDBProvider); + } catch (IOException ex) { + LOG.error("Re-initialization of task manager failed.", e); + } + } + } else { + LOG.error("Re-initialization of tasks failed."); + try { + stagedReconDBProvider.close(); + } catch (Exception e) { + LOG.error("Close of recon container staged db handler is failed", e); + } + recordAllTaskStatus(localReconOmTaskMap, -1, -1); + } + } + + private void recordAllTaskStatus(Map localReconOmTaskMap, int status, long updateSeqNumber) { + localReconOmTaskMap.values().forEach(task -> { + ReconTaskStatusUpdater taskStatusUpdater = taskStatusUpdaterManager.getTaskStatusUpdater(task.getTaskName()); + if (status == 0) { + taskStatusUpdater.setLastUpdatedSeqNumber(updateSeqNumber); + } + taskStatusUpdater.setLastTaskRunStatus(status); + taskStatusUpdater.recordRunCompletion(); + }); + + ReconTaskStatusUpdater reprocessTaskStatus = taskStatusUpdaterManager.getTaskStatusUpdater(REPROCESS_STAGING); + if (status == 0) { + reprocessTaskStatus.setLastUpdatedSeqNumber(updateSeqNumber); } + reprocessTaskStatus.setLastTaskRunStatus(status); + reprocessTaskStatus.recordRunCompletion(); } @Override diff --git a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestNSSummaryTaskControllerIntegration.java b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestNSSummaryTaskControllerIntegration.java index d99f0c8656d0..6fd1493464f2 100644 --- a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestNSSummaryTaskControllerIntegration.java +++ b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestNSSummaryTaskControllerIntegration.java @@ -43,9 +43,12 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.utils.db.DBStore; import org.apache.hadoop.ozone.om.OMMetadataManager; import org.apache.hadoop.ozone.recon.recovery.ReconOMMetadataManager; +import org.apache.hadoop.ozone.recon.spi.ReconContainerMetadataManager; import org.apache.hadoop.ozone.recon.spi.ReconNamespaceSummaryManager; +import org.apache.hadoop.ozone.recon.spi.impl.ReconDBProvider; import org.apache.hadoop.ozone.recon.tasks.NSSummaryTask.RebuildState; import org.apache.hadoop.ozone.recon.tasks.ReconOmTask.TaskResult; import org.apache.hadoop.ozone.recon.tasks.updater.ReconTaskStatusUpdater; @@ -91,6 +94,7 @@ void setUp() throws Exception { // Setup other task mock when(mockOtherTask.getTaskName()).thenReturn("MockTask"); + when(mockOtherTask.getStagedTask(any(), any())).thenReturn(mockOtherTask); when(mockOtherTask.reprocess(any())).thenReturn( new TaskResult.Builder() .setTaskName("MockTask") @@ -102,8 +106,12 @@ void setUp() throws Exception { ReconTaskStatusUpdater mockTaskStatusUpdater = mock(ReconTaskStatusUpdater.class); when(mockTaskStatusUpdaterManager.getTaskStatusUpdater(any())).thenReturn(mockTaskStatusUpdater); - taskController = - new ReconTaskControllerImpl(ozoneConfiguration, java.util.Collections.emptySet(), mockTaskStatusUpdaterManager); + ReconContainerMetadataManager reconContainerMgr = mock(ReconContainerMetadataManager.class); + ReconDBProvider reconDbProvider = mock(ReconDBProvider.class); + when(reconDbProvider.getDbStore()).thenReturn(mock(DBStore.class)); + when(reconDbProvider.getStagedReconDBProvider()).thenReturn(reconDbProvider); + taskController = new ReconTaskControllerImpl(ozoneConfiguration, java.util.Collections.emptySet(), + mockTaskStatusUpdaterManager, reconDbProvider, reconContainerMgr, mockNamespaceSummaryManager); taskController.start(); // Initialize the executor service taskController.registerTask(nsSummaryTask); taskController.registerTask(mockOtherTask); @@ -137,8 +145,14 @@ private NSSummaryTask createTestableNSSummaryTask() { public TaskResult buildTaskResult(boolean success) { return super.buildTaskResult(success); } - - @Override + + @Override + public NSSummaryTask getStagedTask(ReconOMMetadataManager stagedOmMetadataManager, DBStore stagedReconDbStore) + throws IOException { + return this; + } + + @Override protected TaskResult executeReprocess(OMMetadataManager omMetadataManager, long startTime) { // Simplified test implementation that mimics the real execution flow // but bypasses the complex sub-task execution while maintaining proper state management @@ -476,6 +490,7 @@ void testMixedTaskSuccessFailureScenarios() throws Exception { ReconOmTask failTask = mock(ReconOmTask.class); when(successTask.getTaskName()).thenReturn("SuccessTask"); + when(successTask.getStagedTask(any(), any())).thenReturn(successTask); when(successTask.reprocess(any())).thenReturn( new TaskResult.Builder() .setTaskName("SuccessTask") @@ -483,6 +498,7 @@ void testMixedTaskSuccessFailureScenarios() throws Exception { .build()); when(failTask.getTaskName()).thenReturn("FailTask"); + when(failTask.getStagedTask(any(), any())).thenReturn(failTask); when(failTask.reprocess(any())).thenReturn( new TaskResult.Builder() .setTaskName("FailTask") diff --git a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestReconTaskControllerImpl.java b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestReconTaskControllerImpl.java index 1cc0ac3bdc34..56f223e3ae5d 100644 --- a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestReconTaskControllerImpl.java +++ b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestReconTaskControllerImpl.java @@ -28,11 +28,16 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import java.io.IOException; import java.util.HashSet; import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.utils.db.DBStore; import org.apache.hadoop.ozone.om.OMMetadataManager; import org.apache.hadoop.ozone.recon.persistence.AbstractReconSqlDBTest; import org.apache.hadoop.ozone.recon.recovery.ReconOMMetadataManager; +import org.apache.hadoop.ozone.recon.spi.ReconContainerMetadataManager; +import org.apache.hadoop.ozone.recon.spi.ReconNamespaceSummaryManager; +import org.apache.hadoop.ozone.recon.spi.impl.ReconDBProvider; import org.apache.hadoop.ozone.recon.tasks.updater.ReconTaskStatusUpdater; import org.apache.hadoop.ozone.recon.tasks.updater.ReconTaskStatusUpdaterManager; import org.apache.ozone.recon.schema.generated.tables.daos.ReconTaskStatusDao; @@ -53,7 +58,7 @@ public TestReconTaskControllerImpl() { } @BeforeEach - public void setUp() { + public void setUp() throws IOException { OzoneConfiguration ozoneConfiguration = new OzoneConfiguration(); reconTaskStatusDao = getDao(ReconTaskStatusDao.class); ReconTaskStatusUpdaterManager reconTaskStatusUpdaterManagerMock = mock(ReconTaskStatusUpdaterManager.class); @@ -62,8 +67,13 @@ public void setUp() { String taskName = i.getArgument(0); return new ReconTaskStatusUpdater(reconTaskStatusDao, taskName); }); + ReconDBProvider reconDbProvider = mock(ReconDBProvider.class); + when(reconDbProvider.getDbStore()).thenReturn(mock(DBStore.class)); + when(reconDbProvider.getStagedReconDBProvider()).thenReturn(reconDbProvider); + ReconContainerMetadataManager reconContainerMgr = mock(ReconContainerMetadataManager.class); + ReconNamespaceSummaryManager nsSummaryManager = mock(ReconNamespaceSummaryManager.class); reconTaskController = new ReconTaskControllerImpl(ozoneConfiguration, new HashSet<>(), - reconTaskStatusUpdaterManagerMock); + reconTaskStatusUpdaterManagerMock, reconDbProvider, reconContainerMgr, nsSummaryManager); reconTaskController.start(); } @@ -207,6 +217,7 @@ public void testReInitializeTasks() throws Exception { ReconOMMetadataManager.class); ReconOmTask reconOmTaskMock = getMockTask("MockTask2"); + when(reconOmTaskMock.getStagedTask(any(), any())).thenReturn(reconOmTaskMock); when(reconOmTaskMock.reprocess(omMetadataManagerMock)) .thenReturn(new ReconOmTask.TaskResult.Builder().setTaskName("MockTask2").setTaskSuccess(true).build()); when(omMetadataManagerMock.getLastSequenceNumberFromDB() @@ -227,6 +238,10 @@ public void testReInitializeTasks() throws Exception { long taskTimeStamp = reconTaskStatus.getLastUpdatedTimestamp(); long seqNumber = reconTaskStatus.getLastUpdatedSeqNumber(); + ReconTaskStatus reprocessStaging = reconTaskStatusDao.findById("REPROCESS_STAGING"); + assertEquals(omMetadataManagerMock.getLastSequenceNumberFromDB(), reprocessStaging.getLastUpdatedSeqNumber()); + assertEquals(0, reprocessStaging.getLastTaskRunStatus()); + assertThat(taskTimeStamp).isGreaterThanOrEqualTo(startTime).isLessThanOrEqualTo(endTime); assertEquals(seqNumber, omMetadataManagerMock.getLastSequenceNumberFromDB());