diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java index a09626833765..455dabf6ac04 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java @@ -52,6 +52,7 @@ import java.nio.file.attribute.PosixFilePermissions; import java.util.Arrays; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -267,6 +268,36 @@ public void start() { reconTaskController.getRegisteredTasks() .values() .forEach(ReconOmTask::init); + + // Verify if 'OmDeltaRequest' task's lastUpdatedSeqNumber number not matching with + // lastUpdatedSeqNumber number for any of the OM task, then just run reprocess for such tasks. + ReconTaskStatusUpdater deltaTaskStatusUpdater = + taskStatusUpdaterManager.getTaskStatusUpdater(OmSnapshotTaskName.OmDeltaRequest.name()); + + Map reconOmTaskMap = reconTaskController.getRegisteredTasks() + .entrySet() + .stream() + .filter(entry -> { + String taskName = entry.getKey(); + ReconTaskStatusUpdater taskStatusUpdater = taskStatusUpdaterManager.getTaskStatusUpdater(taskName); + + return !taskName.equals(OmSnapshotTaskName.OmDeltaRequest.name()) && // Condition 1 + !taskStatusUpdater.getLastUpdatedSeqNumber() + .equals(deltaTaskStatusUpdater.getLastUpdatedSeqNumber()); // Condition 2 + }) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); // Collect into desired Map + if (!reconOmTaskMap.isEmpty()) { + LOG.info("Task name and last updated sequence number of tasks, that are not matching with " + + "the last updated sequence number of OmDeltaRequest task:\n"); + LOG.info("{} -> {}", deltaTaskStatusUpdater.getTaskName(), deltaTaskStatusUpdater.getLastUpdatedSeqNumber()); + reconOmTaskMap.keySet() + .forEach(taskName -> { + LOG.info("{} -> {}", taskName, + taskStatusUpdaterManager.getTaskStatusUpdater(taskName).getLastUpdatedSeqNumber()); + + }); + } + reconTaskController.reInitializeTasks(omMetadataManager, reconOmTaskMap); startSyncDataFromOM(initialDelay); } @@ -633,7 +664,7 @@ public boolean syncDataFromOM() { // Reinitialize tasks that are listening. LOG.info("Calling reprocess on Recon tasks."); - reconTaskController.reInitializeTasks(omMetadataManager); + reconTaskController.reInitializeTasks(omMetadataManager, null); // Update health status in ReconContext reconContext.updateHealthStatus(new AtomicBoolean(true)); diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconTaskController.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconTaskController.java index 9727e20f263c..8d956f487b95 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconTaskController.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconTaskController.java @@ -40,10 +40,14 @@ void consumeOMEvents(OMUpdateEventBatch events, OMMetadataManager omMetadataManager); /** - * Pass on the handle to a new OM DB instance to the registered tasks. - * @param omMetadataManager OM Metadata Manager instance + * Reinitializes the registered Recon OM tasks with a new OM Metadata Manager instance. + * + * @param omMetadataManager the OM Metadata Manager instance to be used for reinitialization. + * @param reconOmTaskMap a map of Recon OM tasks, which we would like to reinitialize. + * If {@code reconOmTaskMap} is null, all registered Recon OM tasks + * will be reinitialized. */ - void reInitializeTasks(ReconOMMetadataManager omMetadataManager); + void reInitializeTasks(ReconOMMetadataManager omMetadataManager, Map reconOmTaskMap); /** * Get set of registered tasks. diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconTaskControllerImpl.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconTaskControllerImpl.java index e289b6ae15b8..966ee95f0b3d 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconTaskControllerImpl.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconTaskControllerImpl.java @@ -157,19 +157,32 @@ private void ignoreFailedTasks(List failedTasks) { } } + /** + * Reinitializes the registered Recon OM tasks with a new OM Metadata Manager instance. + * + * @param omMetadataManager the OM Metadata Manager instance to be used for reinitialization. + * @param reconOmTaskMap a map of Recon OM tasks whose lastUpdatedSeqNumber does not match + * the lastUpdatedSeqNumber from the previous run of the 'OmDeltaRequest' task. + * These tasks will be reinitialized to process the delta OM DB updates + * received in the last run of 'OmDeltaRequest'. + * If {@code reconOmTaskMap} is null, all registered Recon OM tasks + * will be reinitialized. + */ @Override - public synchronized void reInitializeTasks(ReconOMMetadataManager omMetadataManager) { + public synchronized void reInitializeTasks(ReconOMMetadataManager omMetadataManager, + Map reconOmTaskMap) { Collection> tasks = new ArrayList<>(); + Map localReconOmTaskMap = reconOmTaskMap; + if (reconOmTaskMap == null) { + localReconOmTaskMap = reconOmTasks; + } ReconConstants.resetTableTruncatedFlags(); - for (Map.Entry taskEntry : - reconOmTasks.entrySet()) { - ReconOmTask task = taskEntry.getValue(); - ReconTaskStatusUpdater taskStatusUpdater = - taskStatusUpdaterManager.getTaskStatusUpdater(task.getTaskName()); + + localReconOmTaskMap.values().forEach(task -> { + ReconTaskStatusUpdater taskStatusUpdater = taskStatusUpdaterManager.getTaskStatusUpdater(task.getTaskName()); taskStatusUpdater.recordRunStart(); - tasks.add(new NamedCallableTask<>(task.getTaskName(), - () -> task.reprocess(omMetadataManager))); - } + tasks.add(new NamedCallableTask<>(task.getTaskName(), () -> task.reprocess(omMetadataManager))); + }); try { CompletableFuture.allOf(tasks.stream() diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/updater/ReconTaskStatusUpdater.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/updater/ReconTaskStatusUpdater.java index 34e6e005b4fd..bc1b4ce48e49 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/updater/ReconTaskStatusUpdater.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/updater/ReconTaskStatusUpdater.java @@ -74,6 +74,14 @@ public void setIsCurrentTaskRunning(int isCurrentTaskRunning) { this.reconTaskStatus.setIsCurrentTaskRunning(isCurrentTaskRunning); } + public Long getLastUpdatedSeqNumber() { + return this.reconTaskStatus.getLastUpdatedSeqNumber(); + } + + public String getTaskName() { + return this.taskName; + } + /** * Helper function to update TASK_STATUS table with task start values. * Set the isCurrentTaskRunning as true, update the timestamp. diff --git a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/spi/impl/TestOzoneManagerServiceProviderImpl.java b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/spi/impl/TestOzoneManagerServiceProviderImpl.java index 3346edb09bf1..346f5c3289e1 100644 --- a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/spi/impl/TestOzoneManagerServiceProviderImpl.java +++ b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/spi/impl/TestOzoneManagerServiceProviderImpl.java @@ -477,7 +477,7 @@ public void testSyncDataFromOMFullSnapshot( ReconTaskController reconTaskControllerMock = getMockTaskController(); doNothing().when(reconTaskControllerMock) - .reInitializeTasks(omMetadataManager); + .reInitializeTasks(omMetadataManager, null); ReconTaskStatusUpdaterManager reconTaskStatusUpdaterManager = getMockTaskStatusUpdaterManager(); OzoneManagerServiceProviderImpl ozoneManagerServiceProvider = @@ -494,7 +494,7 @@ public void testSyncDataFromOMFullSnapshot( verify(reconTaskStatusUpdaterManager).getTaskStatusUpdater(taskNameCaptor.capture()); assertEquals(OmSnapshotRequest.name(), taskNameCaptor.getValue()); verify(reconTaskControllerMock, times(1)) - .reInitializeTasks(omMetadataManager); + .reInitializeTasks(omMetadataManager, null); assertEquals(1, metrics.getNumSnapshotRequests()); } @@ -544,7 +544,7 @@ public void testSyncDataFromOMFullSnapshotForSNNFE( ReconTaskController reconTaskControllerMock = getMockTaskController(); doNothing().when(reconTaskControllerMock) - .reInitializeTasks(omMetadataManager); + .reInitializeTasks(omMetadataManager, null); ReconTaskStatusUpdaterManager reconTaskStatusUpdaterManager = getMockTaskStatusUpdaterManager(); OzoneManagerProtocol protocol = getMockOzoneManagerClientWithThrow(); @@ -562,7 +562,7 @@ public void testSyncDataFromOMFullSnapshotForSNNFE( verify(reconTaskStatusUpdaterManager).getTaskStatusUpdater(captor.capture()); assertEquals(OmSnapshotRequest.name(), captor.getValue()); verify(reconTaskControllerMock, times(1)) - .reInitializeTasks(omMetadataManager); + .reInitializeTasks(omMetadataManager, null); assertEquals(1, metrics.getNumSnapshotRequests()); } diff --git a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestReconTaskControllerImpl.java b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestReconTaskControllerImpl.java index ad7eafd1600b..a3062a127548 100644 --- a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestReconTaskControllerImpl.java +++ b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestReconTaskControllerImpl.java @@ -215,7 +215,7 @@ public void testReInitializeTasks() throws Exception { long startTime = System.currentTimeMillis(); reconTaskController.registerTask(reconOmTaskMock); - reconTaskController.reInitializeTasks(omMetadataManagerMock); + reconTaskController.reInitializeTasks(omMetadataManagerMock, null); long endTime = System.currentTimeMillis(); verify(reconOmTaskMock, times(1))