Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import java.nio.file.attribute.PosixFilePermissions;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
Expand Down Expand Up @@ -267,6 +268,36 @@ public void start() {
reconTaskController.getRegisteredTasks()
.values()
.forEach(ReconOmTask::init);

// Verify if 'OmDeltaRequest' task's lastUpdatedSeqNumber number not matching with
// lastUpdatedSeqNumber number for any of the OM task, then just run reprocess for such tasks.
ReconTaskStatusUpdater deltaTaskStatusUpdater =
taskStatusUpdaterManager.getTaskStatusUpdater(OmSnapshotTaskName.OmDeltaRequest.name());

Map<String, ReconOmTask> reconOmTaskMap = reconTaskController.getRegisteredTasks()
.entrySet()
.stream()
.filter(entry -> {
String taskName = entry.getKey();
ReconTaskStatusUpdater taskStatusUpdater = taskStatusUpdaterManager.getTaskStatusUpdater(taskName);

return !taskName.equals(OmSnapshotTaskName.OmDeltaRequest.name()) && // Condition 1
!taskStatusUpdater.getLastUpdatedSeqNumber()
.equals(deltaTaskStatusUpdater.getLastUpdatedSeqNumber()); // Condition 2
})
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); // Collect into desired Map
if (!reconOmTaskMap.isEmpty()) {
LOG.info("Task name and last updated sequence number of tasks, that are not matching with " +
"the last updated sequence number of OmDeltaRequest task:\n");
LOG.info("{} -> {}", deltaTaskStatusUpdater.getTaskName(), deltaTaskStatusUpdater.getLastUpdatedSeqNumber());
reconOmTaskMap.keySet()
.forEach(taskName -> {
LOG.info("{} -> {}", taskName,
taskStatusUpdaterManager.getTaskStatusUpdater(taskName).getLastUpdatedSeqNumber());

});
}
reconTaskController.reInitializeTasks(omMetadataManager, reconOmTaskMap);
startSyncDataFromOM(initialDelay);
}

Expand Down Expand Up @@ -633,7 +664,7 @@ public boolean syncDataFromOM() {

// Reinitialize tasks that are listening.
LOG.info("Calling reprocess on Recon tasks.");
reconTaskController.reInitializeTasks(omMetadataManager);
reconTaskController.reInitializeTasks(omMetadataManager, null);

// Update health status in ReconContext
reconContext.updateHealthStatus(new AtomicBoolean(true));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,14 @@ void consumeOMEvents(OMUpdateEventBatch events,
OMMetadataManager omMetadataManager);

/**
* Pass on the handle to a new OM DB instance to the registered tasks.
* @param omMetadataManager OM Metadata Manager instance
* Reinitializes the registered Recon OM tasks with a new OM Metadata Manager instance.
*
* @param omMetadataManager the OM Metadata Manager instance to be used for reinitialization.
* @param reconOmTaskMap a map of Recon OM tasks, which we would like to reinitialize.
* If {@code reconOmTaskMap} is null, all registered Recon OM tasks
* will be reinitialized.
*/
void reInitializeTasks(ReconOMMetadataManager omMetadataManager);
void reInitializeTasks(ReconOMMetadataManager omMetadataManager, Map<String, ReconOmTask> reconOmTaskMap);

/**
* Get set of registered tasks.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,19 +157,32 @@ private void ignoreFailedTasks(List<ReconOmTask.TaskResult> failedTasks) {
}
}

/**
* Reinitializes the registered Recon OM tasks with a new OM Metadata Manager instance.
*
* @param omMetadataManager the OM Metadata Manager instance to be used for reinitialization.
* @param reconOmTaskMap a map of Recon OM tasks whose lastUpdatedSeqNumber does not match
* the lastUpdatedSeqNumber from the previous run of the 'OmDeltaRequest' task.
* These tasks will be reinitialized to process the delta OM DB updates
* received in the last run of 'OmDeltaRequest'.
* If {@code reconOmTaskMap} is null, all registered Recon OM tasks
* will be reinitialized.
*/
@Override
public synchronized void reInitializeTasks(ReconOMMetadataManager omMetadataManager) {
public synchronized void reInitializeTasks(ReconOMMetadataManager omMetadataManager,
Map<String, ReconOmTask> reconOmTaskMap) {
Collection<NamedCallableTask<ReconOmTask.TaskResult>> tasks = new ArrayList<>();
Map<String, ReconOmTask> localReconOmTaskMap = reconOmTaskMap;
if (reconOmTaskMap == null) {
localReconOmTaskMap = reconOmTasks;
}
ReconConstants.resetTableTruncatedFlags();
for (Map.Entry<String, ReconOmTask> taskEntry :
reconOmTasks.entrySet()) {
ReconOmTask task = taskEntry.getValue();
ReconTaskStatusUpdater taskStatusUpdater =
taskStatusUpdaterManager.getTaskStatusUpdater(task.getTaskName());

localReconOmTaskMap.values().forEach(task -> {
ReconTaskStatusUpdater taskStatusUpdater = taskStatusUpdaterManager.getTaskStatusUpdater(task.getTaskName());
taskStatusUpdater.recordRunStart();
tasks.add(new NamedCallableTask<>(task.getTaskName(),
() -> task.reprocess(omMetadataManager)));
}
tasks.add(new NamedCallableTask<>(task.getTaskName(), () -> task.reprocess(omMetadataManager)));
});

try {
CompletableFuture.allOf(tasks.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,14 @@ public void setIsCurrentTaskRunning(int isCurrentTaskRunning) {
this.reconTaskStatus.setIsCurrentTaskRunning(isCurrentTaskRunning);
}

public Long getLastUpdatedSeqNumber() {
return this.reconTaskStatus.getLastUpdatedSeqNumber();
}

public String getTaskName() {
return this.taskName;
}

/**
* Helper function to update TASK_STATUS table with task start values.
* Set the isCurrentTaskRunning as true, update the timestamp.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -477,7 +477,7 @@ public void testSyncDataFromOMFullSnapshot(

ReconTaskController reconTaskControllerMock = getMockTaskController();
doNothing().when(reconTaskControllerMock)
.reInitializeTasks(omMetadataManager);
.reInitializeTasks(omMetadataManager, null);
ReconTaskStatusUpdaterManager reconTaskStatusUpdaterManager = getMockTaskStatusUpdaterManager();

OzoneManagerServiceProviderImpl ozoneManagerServiceProvider =
Expand All @@ -494,7 +494,7 @@ public void testSyncDataFromOMFullSnapshot(
verify(reconTaskStatusUpdaterManager).getTaskStatusUpdater(taskNameCaptor.capture());
assertEquals(OmSnapshotRequest.name(), taskNameCaptor.getValue());
verify(reconTaskControllerMock, times(1))
.reInitializeTasks(omMetadataManager);
.reInitializeTasks(omMetadataManager, null);
assertEquals(1, metrics.getNumSnapshotRequests());
}

Expand Down Expand Up @@ -544,7 +544,7 @@ public void testSyncDataFromOMFullSnapshotForSNNFE(

ReconTaskController reconTaskControllerMock = getMockTaskController();
doNothing().when(reconTaskControllerMock)
.reInitializeTasks(omMetadataManager);
.reInitializeTasks(omMetadataManager, null);
ReconTaskStatusUpdaterManager reconTaskStatusUpdaterManager = getMockTaskStatusUpdaterManager();

OzoneManagerProtocol protocol = getMockOzoneManagerClientWithThrow();
Expand All @@ -562,7 +562,7 @@ public void testSyncDataFromOMFullSnapshotForSNNFE(
verify(reconTaskStatusUpdaterManager).getTaskStatusUpdater(captor.capture());
assertEquals(OmSnapshotRequest.name(), captor.getValue());
verify(reconTaskControllerMock, times(1))
.reInitializeTasks(omMetadataManager);
.reInitializeTasks(omMetadataManager, null);
assertEquals(1, metrics.getNumSnapshotRequests());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ public void testReInitializeTasks() throws Exception {

long startTime = System.currentTimeMillis();
reconTaskController.registerTask(reconOmTaskMock);
reconTaskController.reInitializeTasks(omMetadataManagerMock);
reconTaskController.reInitializeTasks(omMetadataManagerMock, null);
long endTime = System.currentTimeMillis();

verify(reconOmTaskMock, times(1))
Expand Down