From 47a48d3a0f7c7bc2795a118f6c12aebd8024d2ef Mon Sep 17 00:00:00 2001 From: deveshsingh Date: Mon, 24 Feb 2025 17:56:56 +0530 Subject: [PATCH 1/4] HDDS-12377. Improve error handling of OM background tasks processing in case of abrupt crash of Recon. --- .../impl/OzoneManagerServiceProviderImpl.java | 31 ++++++++++++++++++- .../recon/tasks/ReconTaskController.java | 4 ++- .../recon/tasks/ReconTaskControllerImpl.java | 13 +++++--- .../tasks/updater/ReconTaskStatusUpdater.java | 8 +++++ .../TestOzoneManagerServiceProviderImpl.java | 8 ++--- .../tasks/TestReconTaskControllerImpl.java | 2 +- 6 files changed, 54 insertions(+), 12 deletions(-) diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java index abe85e476394..bd78c8e7da5c 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java @@ -52,6 +52,7 @@ import java.nio.file.attribute.PosixFilePermissions; import java.util.Arrays; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -267,6 +268,34 @@ public void start() { reconTaskController.getRegisteredTasks() .values() .forEach(ReconOmTask::init); + + // Verify if 'OmDeltaRequest' task's lastUpdatedSeqNumber number not matching with + // lastUpdatedSeqNumber number for any of the OM task, then just run reprocess for such tasks. + ReconTaskStatusUpdater deltaTaskStatusUpdater = + taskStatusUpdaterManager.getTaskStatusUpdater(OmSnapshotTaskName.OmDeltaRequest.name()); + + Map reconOmTaskMap = reconTaskController.getRegisteredTasks() + .entrySet() + .stream() + .filter(entry -> { + String taskName = entry.getKey(); + ReconOmTask task = entry.getValue(); + ReconTaskStatusUpdater taskStatusUpdater = taskStatusUpdaterManager.getTaskStatusUpdater(taskName); + + return !taskName.equals(OmSnapshotTaskName.OmDeltaRequest.name()) && // Condition 1 + !taskStatusUpdater.getLastUpdatedSeqNumber() + .equals(deltaTaskStatusUpdater.getLastUpdatedSeqNumber()); // Condition 2 + }) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); // Collect into desired Map + LOG.info("Task details of such tasks whose lastUpdatedSeqNumber number not matching with " + + "lastUpdatedSeqNumber of 'OmDeltaRequest' task::\n"); + LOG.info(deltaTaskStatusUpdater.getTaskName() + "->" + deltaTaskStatusUpdater.getLastUpdatedSeqNumber()); + reconOmTaskMap.keySet() + .forEach(taskName -> { + LOG.info(taskName + "->" + taskStatusUpdaterManager.getTaskStatusUpdater(taskName).getLastUpdatedSeqNumber()); + + }); + reconTaskController.reInitializeTasks(omMetadataManager, reconOmTaskMap); startSyncDataFromOM(initialDelay); } @@ -631,7 +660,7 @@ public boolean syncDataFromOM() { // Reinitialize tasks that are listening. LOG.info("Calling reprocess on Recon tasks."); - reconTaskController.reInitializeTasks(omMetadataManager); + reconTaskController.reInitializeTasks(omMetadataManager, null); // Update health status in ReconContext reconContext.updateHealthStatus(new AtomicBoolean(true)); diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconTaskController.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconTaskController.java index 9727e20f263c..c2b6fd0bc0d5 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconTaskController.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconTaskController.java @@ -41,9 +41,11 @@ void consumeOMEvents(OMUpdateEventBatch events, /** * Pass on the handle to a new OM DB instance to the registered tasks. + * * @param omMetadataManager OM Metadata Manager instance + * @param reconOmTaskMap */ - void reInitializeTasks(ReconOMMetadataManager omMetadataManager); + void reInitializeTasks(ReconOMMetadataManager omMetadataManager, Map reconOmTaskMap); /** * Get set of registered tasks. diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconTaskControllerImpl.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconTaskControllerImpl.java index c1d786db1a9d..c27f7a4729c7 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconTaskControllerImpl.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconTaskControllerImpl.java @@ -154,16 +154,19 @@ private void ignoreFailedTasks(List failedTasks) { } @Override - public synchronized void reInitializeTasks(ReconOMMetadataManager omMetadataManager) { + public synchronized void reInitializeTasks(ReconOMMetadataManager omMetadataManager, + Map reconOmTaskMap) { Collection>> tasks = new ArrayList<>(); + Map localReconOmTaskMap = reconOmTaskMap; + if (null == reconOmTaskMap) { + localReconOmTaskMap = reconOmTasks; + } ReconConstants.resetTableTruncatedFlags(); - for (Map.Entry taskEntry : - reconOmTasks.entrySet()) { - ReconOmTask task = taskEntry.getValue(); + localReconOmTaskMap.values().forEach(task -> { ReconTaskStatusUpdater taskStatusUpdater = taskStatusUpdaterManager.getTaskStatusUpdater(task.getTaskName()); taskStatusUpdater.recordRunStart(); tasks.add(new NamedCallableTask<>(task.getTaskName(), () -> task.reprocess(omMetadataManager))); - } + }); try { CompletableFuture.allOf(tasks.stream() diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/updater/ReconTaskStatusUpdater.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/updater/ReconTaskStatusUpdater.java index 34e6e005b4fd..bc1b4ce48e49 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/updater/ReconTaskStatusUpdater.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/updater/ReconTaskStatusUpdater.java @@ -74,6 +74,14 @@ public void setIsCurrentTaskRunning(int isCurrentTaskRunning) { this.reconTaskStatus.setIsCurrentTaskRunning(isCurrentTaskRunning); } + public Long getLastUpdatedSeqNumber() { + return this.reconTaskStatus.getLastUpdatedSeqNumber(); + } + + public String getTaskName() { + return this.taskName; + } + /** * Helper function to update TASK_STATUS table with task start values. * Set the isCurrentTaskRunning as true, update the timestamp. diff --git a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/spi/impl/TestOzoneManagerServiceProviderImpl.java b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/spi/impl/TestOzoneManagerServiceProviderImpl.java index 3346edb09bf1..346f5c3289e1 100644 --- a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/spi/impl/TestOzoneManagerServiceProviderImpl.java +++ b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/spi/impl/TestOzoneManagerServiceProviderImpl.java @@ -477,7 +477,7 @@ public void testSyncDataFromOMFullSnapshot( ReconTaskController reconTaskControllerMock = getMockTaskController(); doNothing().when(reconTaskControllerMock) - .reInitializeTasks(omMetadataManager); + .reInitializeTasks(omMetadataManager, null); ReconTaskStatusUpdaterManager reconTaskStatusUpdaterManager = getMockTaskStatusUpdaterManager(); OzoneManagerServiceProviderImpl ozoneManagerServiceProvider = @@ -494,7 +494,7 @@ public void testSyncDataFromOMFullSnapshot( verify(reconTaskStatusUpdaterManager).getTaskStatusUpdater(taskNameCaptor.capture()); assertEquals(OmSnapshotRequest.name(), taskNameCaptor.getValue()); verify(reconTaskControllerMock, times(1)) - .reInitializeTasks(omMetadataManager); + .reInitializeTasks(omMetadataManager, null); assertEquals(1, metrics.getNumSnapshotRequests()); } @@ -544,7 +544,7 @@ public void testSyncDataFromOMFullSnapshotForSNNFE( ReconTaskController reconTaskControllerMock = getMockTaskController(); doNothing().when(reconTaskControllerMock) - .reInitializeTasks(omMetadataManager); + .reInitializeTasks(omMetadataManager, null); ReconTaskStatusUpdaterManager reconTaskStatusUpdaterManager = getMockTaskStatusUpdaterManager(); OzoneManagerProtocol protocol = getMockOzoneManagerClientWithThrow(); @@ -562,7 +562,7 @@ public void testSyncDataFromOMFullSnapshotForSNNFE( verify(reconTaskStatusUpdaterManager).getTaskStatusUpdater(captor.capture()); assertEquals(OmSnapshotRequest.name(), captor.getValue()); verify(reconTaskControllerMock, times(1)) - .reInitializeTasks(omMetadataManager); + .reInitializeTasks(omMetadataManager, null); assertEquals(1, metrics.getNumSnapshotRequests()); } diff --git a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestReconTaskControllerImpl.java b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestReconTaskControllerImpl.java index 384da46aa16a..5915a05044d4 100644 --- a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestReconTaskControllerImpl.java +++ b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestReconTaskControllerImpl.java @@ -215,7 +215,7 @@ public void testReInitializeTasks() throws Exception { long startTime = System.currentTimeMillis(); reconTaskController.registerTask(reconOmTaskMock); - reconTaskController.reInitializeTasks(omMetadataManagerMock); + reconTaskController.reInitializeTasks(omMetadataManagerMock, null); long endTime = System.currentTimeMillis(); verify(reconOmTaskMock, times(1)) From dfb44403f329713ff5f063f3dc0ce9fce1278176 Mon Sep 17 00:00:00 2001 From: deveshsingh Date: Mon, 24 Feb 2025 18:06:34 +0530 Subject: [PATCH 2/4] HDDS-12377. Fixed import checkstyle issue. --- .../hadoop/ozone/om/ratis_snapshot/OmRatisSnapshotProvider.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis_snapshot/OmRatisSnapshotProvider.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis_snapshot/OmRatisSnapshotProvider.java index 38724a5b757d..36e7c80307e4 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis_snapshot/OmRatisSnapshotProvider.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis_snapshot/OmRatisSnapshotProvider.java @@ -30,9 +30,9 @@ import java.io.DataOutputStream; import java.io.File; +import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; -import java.io.FileOutputStream; import java.net.HttpURLConnection; import java.net.URL; import java.util.List; From 4a02c8252db5acea46f5110b9251e4e19db7428b Mon Sep 17 00:00:00 2001 From: deveshsingh Date: Tue, 4 Mar 2025 11:27:21 +0530 Subject: [PATCH 3/4] HDDS-12377. Added isEmpty check and javadoc. --- .../impl/OzoneManagerServiceProviderImpl.java | 20 ++++++++++--------- .../recon/tasks/ReconTaskController.java | 11 +++++++--- .../recon/tasks/ReconTaskControllerImpl.java | 13 +++++++++++- 3 files changed, 31 insertions(+), 13 deletions(-) diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java index d9cdf3479bd5..dfdfe2923229 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java @@ -279,7 +279,6 @@ public void start() { .stream() .filter(entry -> { String taskName = entry.getKey(); - ReconOmTask task = entry.getValue(); ReconTaskStatusUpdater taskStatusUpdater = taskStatusUpdaterManager.getTaskStatusUpdater(taskName); return !taskName.equals(OmSnapshotTaskName.OmDeltaRequest.name()) && // Condition 1 @@ -287,14 +286,17 @@ public void start() { .equals(deltaTaskStatusUpdater.getLastUpdatedSeqNumber()); // Condition 2 }) .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); // Collect into desired Map - LOG.info("Task details of such tasks whose lastUpdatedSeqNumber number not matching with " + - "lastUpdatedSeqNumber of 'OmDeltaRequest' task::\n"); - LOG.info(deltaTaskStatusUpdater.getTaskName() + "->" + deltaTaskStatusUpdater.getLastUpdatedSeqNumber()); - reconOmTaskMap.keySet() - .forEach(taskName -> { - LOG.info(taskName + "->" + taskStatusUpdaterManager.getTaskStatusUpdater(taskName).getLastUpdatedSeqNumber()); - - }); + if (!reconOmTaskMap.isEmpty()) { + LOG.info("Task details of such tasks whose lastUpdatedSeqNumber number not matching with " + + "lastUpdatedSeqNumber of 'OmDeltaRequest' task::\n"); + LOG.info("{}->{}", deltaTaskStatusUpdater.getTaskName(), deltaTaskStatusUpdater.getLastUpdatedSeqNumber()); + reconOmTaskMap.keySet() + .forEach(taskName -> { + LOG.info("{}->{}", taskName, + taskStatusUpdaterManager.getTaskStatusUpdater(taskName).getLastUpdatedSeqNumber()); + + }); + } reconTaskController.reInitializeTasks(omMetadataManager, reconOmTaskMap); startSyncDataFromOM(initialDelay); } diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconTaskController.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconTaskController.java index c2b6fd0bc0d5..f5e1dc39eba5 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconTaskController.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconTaskController.java @@ -40,10 +40,15 @@ void consumeOMEvents(OMUpdateEventBatch events, OMMetadataManager omMetadataManager); /** - * Pass on the handle to a new OM DB instance to the registered tasks. + * Reinitializes the registered Recon OM tasks with a new OM Metadata Manager instance. * - * @param omMetadataManager OM Metadata Manager instance - * @param reconOmTaskMap + * @param omMetadataManager the OM Metadata Manager instance to be used for reinitialization. + * @param reconOmTaskMap a map of Recon OM tasks whose lastUpdatedSeqNumber does not match + * the lastUpdatedSeqNumber from the previous run of the 'OmDeltaRequest' task. + * These tasks will be reinitialized to process the delta OM DB updates + * received in the last run of 'OmDeltaRequest'. + * If {@code reconOmTaskMap} is null, all registered Recon OM tasks + * will be reinitialized. */ void reInitializeTasks(ReconOMMetadataManager omMetadataManager, Map reconOmTaskMap); diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconTaskControllerImpl.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconTaskControllerImpl.java index d4dcc735bb8b..966ee95f0b3d 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconTaskControllerImpl.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconTaskControllerImpl.java @@ -157,12 +157,23 @@ private void ignoreFailedTasks(List failedTasks) { } } + /** + * Reinitializes the registered Recon OM tasks with a new OM Metadata Manager instance. + * + * @param omMetadataManager the OM Metadata Manager instance to be used for reinitialization. + * @param reconOmTaskMap a map of Recon OM tasks whose lastUpdatedSeqNumber does not match + * the lastUpdatedSeqNumber from the previous run of the 'OmDeltaRequest' task. + * These tasks will be reinitialized to process the delta OM DB updates + * received in the last run of 'OmDeltaRequest'. + * If {@code reconOmTaskMap} is null, all registered Recon OM tasks + * will be reinitialized. + */ @Override public synchronized void reInitializeTasks(ReconOMMetadataManager omMetadataManager, Map reconOmTaskMap) { Collection> tasks = new ArrayList<>(); Map localReconOmTaskMap = reconOmTaskMap; - if (null == reconOmTaskMap) { + if (reconOmTaskMap == null) { localReconOmTaskMap = reconOmTasks; } ReconConstants.resetTableTruncatedFlags(); From fbb90c4c6816a208b02e75e8f272db433f16b1ef Mon Sep 17 00:00:00 2001 From: deveshsingh Date: Wed, 5 Mar 2025 19:26:37 +0530 Subject: [PATCH 4/4] HDDS-12377. Updated javadoc and log statement. --- .../recon/spi/impl/OzoneManagerServiceProviderImpl.java | 8 ++++---- .../hadoop/ozone/recon/tasks/ReconTaskController.java | 5 +---- 2 files changed, 5 insertions(+), 8 deletions(-) diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java index dfdfe2923229..455dabf6ac04 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java @@ -287,12 +287,12 @@ public void start() { }) .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); // Collect into desired Map if (!reconOmTaskMap.isEmpty()) { - LOG.info("Task details of such tasks whose lastUpdatedSeqNumber number not matching with " + - "lastUpdatedSeqNumber of 'OmDeltaRequest' task::\n"); - LOG.info("{}->{}", deltaTaskStatusUpdater.getTaskName(), deltaTaskStatusUpdater.getLastUpdatedSeqNumber()); + LOG.info("Task name and last updated sequence number of tasks, that are not matching with " + + "the last updated sequence number of OmDeltaRequest task:\n"); + LOG.info("{} -> {}", deltaTaskStatusUpdater.getTaskName(), deltaTaskStatusUpdater.getLastUpdatedSeqNumber()); reconOmTaskMap.keySet() .forEach(taskName -> { - LOG.info("{}->{}", taskName, + LOG.info("{} -> {}", taskName, taskStatusUpdaterManager.getTaskStatusUpdater(taskName).getLastUpdatedSeqNumber()); }); diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconTaskController.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconTaskController.java index f5e1dc39eba5..8d956f487b95 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconTaskController.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconTaskController.java @@ -43,10 +43,7 @@ void consumeOMEvents(OMUpdateEventBatch events, * Reinitializes the registered Recon OM tasks with a new OM Metadata Manager instance. * * @param omMetadataManager the OM Metadata Manager instance to be used for reinitialization. - * @param reconOmTaskMap a map of Recon OM tasks whose lastUpdatedSeqNumber does not match - * the lastUpdatedSeqNumber from the previous run of the 'OmDeltaRequest' task. - * These tasks will be reinitialized to process the delta OM DB updates - * received in the last run of 'OmDeltaRequest'. + * @param reconOmTaskMap a map of Recon OM tasks, which we would like to reinitialize. * If {@code reconOmTaskMap} is null, all registered Recon OM tasks * will be reinitialized. */