diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java index abe85e476394..a09626833765 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java @@ -501,7 +501,9 @@ ImmutablePair innerGetAndApplyDeltaUpdatesFromOM(long fromSequenc } for (byte[] data : dbUpdates.getData()) { try (ManagedWriteBatch writeBatch = new ManagedWriteBatch(data)) { + // Events gets populated in events list in OMDBUpdatesHandler with call back for put/delete/update writeBatch.iterate(omdbUpdatesHandler); + // Commit the OM DB transactions in recon rocks DB and sync here. try (RDBBatchOperation rdbBatchOperation = new RDBBatchOperation(writeBatch)) { try (ManagedWriteOptions wOpts = new ManagedWriteOptions()) { diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperTask.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperTask.java index 6e6390d32484..e42e021b9e45 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperTask.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperTask.java @@ -34,8 +34,6 @@ import java.util.List; import java.util.Map; import java.util.Set; -import org.apache.commons.lang3.tuple.ImmutablePair; -import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.utils.db.RDBBatchOperation; import org.apache.hadoop.hdds.utils.db.Table; @@ -82,7 +80,7 @@ public ContainerKeyMapperTask(ReconContainerMetadataManager * (container, key) -> count to Recon Container DB. */ @Override - public Pair reprocess(OMMetadataManager omMetadataManager) { + public TaskResult reprocess(OMMetadataManager omMetadataManager) { long omKeyCount = 0; // In-memory maps for fast look up and batch write @@ -118,7 +116,7 @@ public Pair reprocess(OMMetadataManager omMetadataManager) { containerKeyCountMap); if (!checkAndCallFlushToDB(containerKeyMap)) { LOG.error("Unable to flush containerKey information to the DB"); - return new ImmutablePair<>(getTaskName(), false); + return buildTaskResult(false); } omKeyCount++; } @@ -131,7 +129,7 @@ public Pair reprocess(OMMetadataManager omMetadataManager) { containerKeyCountMap)) { LOG.error("Unable to flush Container Key Count and " + "remaining Container Key information to the DB"); - return new ImmutablePair<>(getTaskName(), false); + return buildTaskResult(false); } LOG.debug("Completed 'reprocess' of ContainerKeyMapperTask."); @@ -142,9 +140,9 @@ public Pair reprocess(OMMetadataManager omMetadataManager) { } catch (IOException ioEx) { LOG.error("Unable to populate Container Key data in Recon DB. ", ioEx); - return new ImmutablePair<>(getTaskName(), false); + return buildTaskResult(false); } - return new ImmutablePair<>(getTaskName(), true); + return buildTaskResult(true); } private boolean flushAndCommitContainerKeyInfoToDB( @@ -189,7 +187,8 @@ public Collection getTaskTables() { } @Override - public Pair process(OMUpdateEventBatch events) { + public TaskResult process(OMUpdateEventBatch events, + Map subTaskSeekPosMap) { Iterator eventIterator = events.getIterator(); int eventCount = 0; final Collection taskTables = getTaskTables(); @@ -246,18 +245,18 @@ public Pair process(OMUpdateEventBatch events) { } catch (IOException e) { LOG.error("Unexpected exception while updating key data : {} ", updatedKey, e); - return new ImmutablePair<>(getTaskName(), false); + return buildTaskResult(false); } } try { writeToTheDB(containerKeyMap, containerKeyCountMap, deletedKeyCountList); } catch (IOException e) { LOG.error("Unable to write Container Key Prefix data in Recon DB.", e); - return new ImmutablePair<>(getTaskName(), false); + return buildTaskResult(false); } LOG.debug("{} successfully processed {} OM DB update event(s) in {} milliseconds.", getTaskName(), eventCount, (System.currentTimeMillis() - startTime)); - return new ImmutablePair<>(getTaskName(), true); + return buildTaskResult(true); } private void writeToTheDB(Map containerKeyMap, diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/FileSizeCountTask.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/FileSizeCountTask.java deleted file mode 100644 index 67da0d6c785d..000000000000 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/FileSizeCountTask.java +++ /dev/null @@ -1,333 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.ozone.recon.tasks; - -import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.FILE_TABLE; -import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.KEY_TABLE; -import static org.hadoop.ozone.recon.schema.tables.FileCountBySizeTable.FILE_COUNT_BY_SIZE; - -import com.google.inject.Inject; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import org.apache.commons.lang3.tuple.ImmutablePair; -import org.apache.commons.lang3.tuple.Pair; -import org.apache.hadoop.hdds.utils.db.Table; -import org.apache.hadoop.hdds.utils.db.TableIterator; -import org.apache.hadoop.ozone.om.OMMetadataManager; -import org.apache.hadoop.ozone.om.helpers.BucketLayout; -import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; -import org.apache.hadoop.ozone.recon.ReconUtils; -import org.hadoop.ozone.recon.schema.UtilizationSchemaDefinition; -import org.hadoop.ozone.recon.schema.tables.daos.FileCountBySizeDao; -import org.hadoop.ozone.recon.schema.tables.pojos.FileCountBySize; -import org.jooq.DSLContext; -import org.jooq.Record3; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Class to iterate over the OM DB and store the counts of existing/new - * files binned into ranges (1KB, 2Kb..,4MB,.., 1TB,..1PB) to the Recon - * fileSize DB. - */ -public class FileSizeCountTask implements ReconOmTask { - private static final Logger LOG = - LoggerFactory.getLogger(FileSizeCountTask.class); - - private FileCountBySizeDao fileCountBySizeDao; - private DSLContext dslContext; - - @Inject - public FileSizeCountTask(FileCountBySizeDao fileCountBySizeDao, - UtilizationSchemaDefinition - utilizationSchemaDefinition) { - this.fileCountBySizeDao = fileCountBySizeDao; - this.dslContext = utilizationSchemaDefinition.getDSLContext(); - } - - /** - * Read the Keys from OM snapshot DB and calculate the upper bound of - * File Size it belongs to. - * - * @param omMetadataManager OM Metadata instance. - * @return Pair - */ - @Override - public Pair reprocess(OMMetadataManager omMetadataManager) { - // Map to store the count of files based on file size - Map fileSizeCountMap = new HashMap<>(); - - // Delete all records from FILE_COUNT_BY_SIZE table - int execute = dslContext.delete(FILE_COUNT_BY_SIZE).execute(); - LOG.debug("Deleted {} records from {}", execute, FILE_COUNT_BY_SIZE); - - // Call reprocessBucket method for FILE_SYSTEM_OPTIMIZED bucket layout - boolean statusFSO = - reprocessBucketLayout(BucketLayout.FILE_SYSTEM_OPTIMIZED, - omMetadataManager, - fileSizeCountMap); - // Call reprocessBucket method for LEGACY bucket layout - boolean statusOBS = - reprocessBucketLayout(BucketLayout.LEGACY, omMetadataManager, - fileSizeCountMap); - if (!statusFSO && !statusOBS) { - return new ImmutablePair<>(getTaskName(), false); - } - writeCountsToDB(fileSizeCountMap); - LOG.debug("Completed a 'reprocess' run of FileSizeCountTask."); - return new ImmutablePair<>(getTaskName(), true); - } - - private boolean reprocessBucketLayout(BucketLayout bucketLayout, - OMMetadataManager omMetadataManager, - Map fileSizeCountMap) { - Table omKeyInfoTable = - omMetadataManager.getKeyTable(bucketLayout); - try (TableIterator> - keyIter = omKeyInfoTable.iterator()) { - while (keyIter.hasNext()) { - Table.KeyValue kv = keyIter.next(); - handlePutKeyEvent(kv.getValue(), fileSizeCountMap); - // The time complexity of .size() method is constant time, O(1) - if (fileSizeCountMap.size() >= 100000) { - writeCountsToDB(fileSizeCountMap); - fileSizeCountMap.clear(); - } - } - } catch (IOException ioEx) { - LOG.error("Unable to populate File Size Count for " + bucketLayout + - " in Recon DB. ", ioEx); - return false; - } - return true; - } - - @Override - public String getTaskName() { - return "FileSizeCountTask"; - } - - public Collection getTaskTables() { - List taskTables = new ArrayList<>(); - taskTables.add(KEY_TABLE); - taskTables.add(FILE_TABLE); - return taskTables; - } - - /** - * Read the Keys from update events and update the count of files - * pertaining to a certain upper bound. - * - * @param events Update events - PUT/DELETE. - * @return Pair - */ - @Override - public Pair process(OMUpdateEventBatch events) { - Iterator eventIterator = events.getIterator(); - Map fileSizeCountMap = new HashMap<>(); - final Collection taskTables = getTaskTables(); - - long startTime = System.currentTimeMillis(); - while (eventIterator.hasNext()) { - OMDBUpdateEvent omdbUpdateEvent = eventIterator.next(); - // Filter event inside process method to avoid duping - if (!taskTables.contains(omdbUpdateEvent.getTable())) { - continue; - } - String updatedKey = omdbUpdateEvent.getKey(); - Object value = omdbUpdateEvent.getValue(); - Object oldValue = omdbUpdateEvent.getOldValue(); - - if (value instanceof OmKeyInfo) { - OmKeyInfo omKeyInfo = (OmKeyInfo) value; - OmKeyInfo omKeyInfoOld = (OmKeyInfo) oldValue; - - try { - switch (omdbUpdateEvent.getAction()) { - case PUT: - handlePutKeyEvent(omKeyInfo, fileSizeCountMap); - break; - - case DELETE: - handleDeleteKeyEvent(updatedKey, omKeyInfo, fileSizeCountMap); - break; - - case UPDATE: - if (omKeyInfoOld != null) { - handleDeleteKeyEvent(updatedKey, omKeyInfoOld, fileSizeCountMap); - handlePutKeyEvent(omKeyInfo, fileSizeCountMap); - } else { - LOG.warn("Update event does not have the old keyInfo for {}.", - updatedKey); - } - break; - - default: - LOG.trace("Skipping DB update event : {}", - omdbUpdateEvent.getAction()); - } - } catch (Exception e) { - LOG.error("Unexpected exception while processing key {}.", - updatedKey, e); - return new ImmutablePair<>(getTaskName(), false); - } - } else { - LOG.warn("Unexpected value type {} for key {}. Skipping processing.", - value.getClass().getName(), updatedKey); - } - } - writeCountsToDB(fileSizeCountMap); - LOG.debug("{} successfully processed in {} milliseconds", - getTaskName(), (System.currentTimeMillis() - startTime)); - return new ImmutablePair<>(getTaskName(), true); - } - - /** - * Populate DB with the counts of file sizes calculated - * using the dao. - * - */ - private void writeCountsToDB(Map fileSizeCountMap) { - - List insertToDb = new ArrayList<>(); - List updateInDb = new ArrayList<>(); - boolean isDbTruncated = isFileCountBySizeTableEmpty(); // Check if table is empty - - fileSizeCountMap.keySet().forEach((FileSizeCountKey key) -> { - FileCountBySize newRecord = new FileCountBySize(); - newRecord.setVolume(key.volume); - newRecord.setBucket(key.bucket); - newRecord.setFileSize(key.fileSizeUpperBound); - newRecord.setCount(fileSizeCountMap.get(key)); - if (!isDbTruncated) { - // Get the current count from database and update - Record3 recordToFind = - dslContext.newRecord( - FILE_COUNT_BY_SIZE.VOLUME, - FILE_COUNT_BY_SIZE.BUCKET, - FILE_COUNT_BY_SIZE.FILE_SIZE) - .value1(key.volume) - .value2(key.bucket) - .value3(key.fileSizeUpperBound); - FileCountBySize fileCountRecord = - fileCountBySizeDao.findById(recordToFind); - if (fileCountRecord == null && newRecord.getCount() > 0L) { - // insert new row only for non-zero counts. - insertToDb.add(newRecord); - } else if (fileCountRecord != null) { - newRecord.setCount(fileCountRecord.getCount() + - fileSizeCountMap.get(key)); - updateInDb.add(newRecord); - } - } else if (newRecord.getCount() > 0) { - // insert new row only for non-zero counts. - insertToDb.add(newRecord); - } - }); - fileCountBySizeDao.insert(insertToDb); - fileCountBySizeDao.update(updateInDb); - } - - private FileSizeCountKey getFileSizeCountKey(OmKeyInfo omKeyInfo) { - return new FileSizeCountKey(omKeyInfo.getVolumeName(), - omKeyInfo.getBucketName(), - ReconUtils.getFileSizeUpperBound(omKeyInfo.getDataSize())); - } - - /** - * Calculate and update the count of files being tracked by - * fileSizeCountMap. - * Used by reprocess() and process(). - * - * @param omKeyInfo OmKey being updated for count - */ - private void handlePutKeyEvent(OmKeyInfo omKeyInfo, - Map fileSizeCountMap) { - FileSizeCountKey key = getFileSizeCountKey(omKeyInfo); - Long count = fileSizeCountMap.containsKey(key) ? - fileSizeCountMap.get(key) + 1L : 1L; - fileSizeCountMap.put(key, count); - } - - private BucketLayout getBucketLayout() { - return BucketLayout.DEFAULT; - } - - /** - * Calculate and update the count of files being tracked by - * fileSizeCountMap. - * Used by reprocess() and process(). - * - * @param omKeyInfo OmKey being updated for count - */ - private void handleDeleteKeyEvent(String key, OmKeyInfo omKeyInfo, - Map - fileSizeCountMap) { - if (omKeyInfo == null) { - LOG.warn("Deleting a key not found while handling DELETE key event. Key" + - " not found in Recon OM DB : {}", key); - } else { - FileSizeCountKey countKey = getFileSizeCountKey(omKeyInfo); - Long count = fileSizeCountMap.containsKey(countKey) ? - fileSizeCountMap.get(countKey) - 1L : -1L; - fileSizeCountMap.put(countKey, count); - } - } - - /** - * Checks if the FILE_COUNT_BY_SIZE table is empty. - * - * @return true if the table is empty, false otherwise. - */ - private boolean isFileCountBySizeTableEmpty() { - return dslContext.fetchCount(FILE_COUNT_BY_SIZE) == 0; - } - - private static class FileSizeCountKey { - private String volume; - private String bucket; - private Long fileSizeUpperBound; - - FileSizeCountKey(String volume, String bucket, - Long fileSizeUpperBound) { - this.volume = volume; - this.bucket = bucket; - this.fileSizeUpperBound = fileSizeUpperBound; - } - - @Override - public boolean equals(Object obj) { - if (obj instanceof FileSizeCountKey) { - FileSizeCountKey s = (FileSizeCountKey) obj; - return volume.equals(s.volume) && bucket.equals(s.bucket) && - fileSizeUpperBound.equals(s.fileSizeUpperBound); - } - return false; - } - - @Override - public int hashCode() { - return (volume + bucket + fileSizeUpperBound).hashCode(); - } - } -} diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/FileSizeCountTaskFSO.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/FileSizeCountTaskFSO.java index f40a859590b0..a411444780ae 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/FileSizeCountTaskFSO.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/FileSizeCountTaskFSO.java @@ -18,7 +18,7 @@ package org.apache.hadoop.ozone.recon.tasks; import com.google.inject.Inject; -import org.apache.commons.lang3.tuple.Pair; +import java.util.Map; import org.apache.hadoop.ozone.om.OMMetadataManager; import org.apache.hadoop.ozone.om.OmMetadataManagerImpl; import org.apache.hadoop.ozone.om.helpers.BucketLayout; @@ -42,7 +42,7 @@ public FileSizeCountTaskFSO(FileCountBySizeDao fileCountBySizeDao, } @Override - public Pair reprocess(OMMetadataManager omMetadataManager) { + public TaskResult reprocess(OMMetadataManager omMetadataManager) { return FileSizeCountTaskHelper.reprocess( omMetadataManager, dslContext, @@ -53,7 +53,7 @@ public Pair reprocess(OMMetadataManager omMetadataManager) { } @Override - public Pair process(OMUpdateEventBatch events) { + public TaskResult process(OMUpdateEventBatch events, Map subTaskSeekPosMap) { // This task listens only on the FILE_TABLE. return FileSizeCountTaskHelper.processEvents( events, diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/FileSizeCountTaskHelper.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/FileSizeCountTaskHelper.java index 489449d6a98e..406ad2e953a3 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/FileSizeCountTaskHelper.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/FileSizeCountTaskHelper.java @@ -25,8 +25,6 @@ import java.util.Iterator; import java.util.List; import java.util.Map; -import org.apache.commons.lang3.tuple.ImmutablePair; -import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.hdds.utils.db.Table; import org.apache.hadoop.hdds.utils.db.TableIterator; import org.apache.hadoop.ozone.om.OMMetadataManager; @@ -85,11 +83,11 @@ public static void truncateTableIfNeeded(DSLContext dslContext) { * @param taskName The name of the task for logging. * @return A Pair of task name and boolean indicating success. */ - public static Pair reprocess(OMMetadataManager omMetadataManager, - DSLContext dslContext, - FileCountBySizeDao fileCountBySizeDao, - BucketLayout bucketLayout, - String taskName) { + public static ReconOmTask.TaskResult reprocess(OMMetadataManager omMetadataManager, + DSLContext dslContext, + FileCountBySizeDao fileCountBySizeDao, + BucketLayout bucketLayout, + String taskName) { LOG.info("Starting Reprocess for {}", taskName); Map fileSizeCountMap = new HashMap<>(); long startTime = System.currentTimeMillis(); @@ -97,12 +95,12 @@ public static Pair reprocess(OMMetadataManager omMetadataManage boolean status = reprocessBucketLayout( bucketLayout, omMetadataManager, fileSizeCountMap, dslContext, fileCountBySizeDao, taskName); if (!status) { - return new ImmutablePair<>(taskName, false); + return buildTaskResult(taskName, false); } writeCountsToDB(fileSizeCountMap, dslContext, fileCountBySizeDao); long endTime = System.currentTimeMillis(); LOG.info("{} completed Reprocess in {} ms.", taskName, (endTime - startTime)); - return new ImmutablePair<>(taskName, true); + return buildTaskResult(taskName, true); } /** @@ -155,11 +153,11 @@ public static boolean reprocessBucketLayout(BucketLayout bucketLayout, * @param taskName The name of the task for logging. * @return A Pair of task name and boolean indicating success. */ - public static Pair processEvents(OMUpdateEventBatch events, - String tableName, - DSLContext dslContext, - FileCountBySizeDao fileCountBySizeDao, - String taskName) { + public static ReconOmTask.TaskResult processEvents(OMUpdateEventBatch events, + String tableName, + DSLContext dslContext, + FileCountBySizeDao fileCountBySizeDao, + String taskName) { Iterator eventIterator = events.getIterator(); Map fileSizeCountMap = new HashMap<>(); long startTime = System.currentTimeMillis(); @@ -195,7 +193,7 @@ public static Pair processEvents(OMUpdateEventBatch events, } } catch (Exception e) { LOG.error("Unexpected exception while processing key {}.", updatedKey, e); - return new ImmutablePair<>(taskName, false); + return buildTaskResult(taskName, false); } } else { LOG.warn("Unexpected value type {} for key {}. Skipping processing.", @@ -205,7 +203,7 @@ public static Pair processEvents(OMUpdateEventBatch events, writeCountsToDB(fileSizeCountMap, dslContext, fileCountBySizeDao); LOG.debug("{} successfully processed in {} milliseconds", taskName, (System.currentTimeMillis() - startTime)); - return new ImmutablePair<>(taskName, true); + return buildTaskResult(taskName, true); } /** @@ -328,4 +326,11 @@ public int hashCode() { return (volume + bucket + fileSizeUpperBound).hashCode(); } } + + public static ReconOmTask.TaskResult buildTaskResult(String taskName, boolean success) { + return new ReconOmTask.TaskResult.Builder() + .setTaskName(taskName) + .setTaskSuccess(success) + .build(); + } } diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/FileSizeCountTaskOBS.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/FileSizeCountTaskOBS.java index acaab763ac0a..05cd0e166998 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/FileSizeCountTaskOBS.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/FileSizeCountTaskOBS.java @@ -18,7 +18,7 @@ package org.apache.hadoop.ozone.recon.tasks; import com.google.inject.Inject; -import org.apache.commons.lang3.tuple.Pair; +import java.util.Map; import org.apache.hadoop.ozone.om.OMMetadataManager; import org.apache.hadoop.ozone.om.OmMetadataManagerImpl; import org.apache.hadoop.ozone.om.helpers.BucketLayout; @@ -42,7 +42,7 @@ public FileSizeCountTaskOBS(FileCountBySizeDao fileCountBySizeDao, } @Override - public Pair reprocess(OMMetadataManager omMetadataManager) { + public TaskResult reprocess(OMMetadataManager omMetadataManager) { return FileSizeCountTaskHelper.reprocess( omMetadataManager, dslContext, @@ -53,7 +53,7 @@ public Pair reprocess(OMMetadataManager omMetadataManager) { } @Override - public Pair process(OMUpdateEventBatch events) { + public TaskResult process(OMUpdateEventBatch events, Map subTaskSeekPosMap) { // This task listens only on the KEY_TABLE. return FileSizeCountTaskHelper.processEvents( events, diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/NSSummaryTask.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/NSSummaryTask.java index 215080035080..aa2a94caf1bf 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/NSSummaryTask.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/NSSummaryTask.java @@ -17,11 +17,16 @@ package org.apache.hadoop.ozone.recon.tasks; +import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_NSSUMMARY_FLUSH_TO_DB_MAX_THRESHOLD; +import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_NSSUMMARY_FLUSH_TO_DB_MAX_THRESHOLD_DEFAULT; + import com.google.common.util.concurrent.ThreadFactoryBuilder; import java.io.IOException; import java.util.ArrayList; import java.util.Collection; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; @@ -30,7 +35,6 @@ import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import javax.inject.Inject; -import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.ozone.om.OMMetadataManager; @@ -39,6 +43,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; + /** * Task to query data from OMDB and write into Recon RocksDB. * Reprocess() will take a snapshots on OMDB, and iterate the keyTable, @@ -69,7 +74,6 @@ public class NSSummaryTask implements ReconOmTask { private final NSSummaryTaskWithFSO nsSummaryTaskWithFSO; private final NSSummaryTaskWithLegacy nsSummaryTaskWithLegacy; private final NSSummaryTaskWithOBS nsSummaryTaskWithOBS; - private final OzoneConfiguration ozoneConfiguration; @Inject public NSSummaryTask(ReconNamespaceSummaryManager @@ -80,16 +84,19 @@ public NSSummaryTask(ReconNamespaceSummaryManager ozoneConfiguration) { this.reconNamespaceSummaryManager = reconNamespaceSummaryManager; this.reconOMMetadataManager = reconOMMetadataManager; - this.ozoneConfiguration = ozoneConfiguration; + long nsSummaryFlushToDBMaxThreshold = ozoneConfiguration.getLong( + OZONE_RECON_NSSUMMARY_FLUSH_TO_DB_MAX_THRESHOLD, + OZONE_RECON_NSSUMMARY_FLUSH_TO_DB_MAX_THRESHOLD_DEFAULT); + this.nsSummaryTaskWithFSO = new NSSummaryTaskWithFSO( - reconNamespaceSummaryManager, - reconOMMetadataManager, ozoneConfiguration); + reconNamespaceSummaryManager, reconOMMetadataManager, + ozoneConfiguration, nsSummaryFlushToDBMaxThreshold); this.nsSummaryTaskWithLegacy = new NSSummaryTaskWithLegacy( - reconNamespaceSummaryManager, - reconOMMetadataManager, ozoneConfiguration); + reconNamespaceSummaryManager, reconOMMetadataManager, + ozoneConfiguration, nsSummaryFlushToDBMaxThreshold); this.nsSummaryTaskWithOBS = new NSSummaryTaskWithOBS( - reconNamespaceSummaryManager, - reconOMMetadataManager, ozoneConfiguration); + reconNamespaceSummaryManager, reconOMMetadataManager, + ozoneConfiguration, nsSummaryFlushToDBMaxThreshold); } @Override @@ -97,28 +104,68 @@ public String getTaskName() { return "NSSummaryTask"; } + /** + * Bucket Type Enum which mimic subtasks for their data processing. + */ + public enum BucketType { + FSO("File System Optimized Bucket"), + OBS("Object Store Bucket"), + LEGACY("Legacy Bucket"); + + private final String description; + + BucketType(String description) { + this.description = description; + } + + public String getDescription() { + return description; + } + } + @Override - public Pair process(OMUpdateEventBatch events) { - long startTime = System.currentTimeMillis(); - boolean success = nsSummaryTaskWithFSO.processWithFSO(events); - if (!success) { + public TaskResult process( + OMUpdateEventBatch events, Map subTaskSeekPosMap) { + boolean anyFailure = false; // Track if any bucket fails + Map updatedSeekPositions = new HashMap<>(); + + // Process FSO bucket + Integer bucketSeek = subTaskSeekPosMap.getOrDefault(BucketType.FSO.name(), 0); + Pair bucketResult = nsSummaryTaskWithFSO.processWithFSO(events, bucketSeek); + updatedSeekPositions.put(BucketType.FSO.name(), bucketResult.getLeft()); + if (!bucketResult.getRight()) { LOG.error("processWithFSO failed."); + anyFailure = true; } - success = nsSummaryTaskWithLegacy.processWithLegacy(events); - if (!success) { + + // Process Legacy bucket + bucketSeek = subTaskSeekPosMap.getOrDefault(BucketType.LEGACY.name(), 0); + bucketResult = nsSummaryTaskWithLegacy.processWithLegacy(events, bucketSeek); + updatedSeekPositions.put(BucketType.LEGACY.name(), bucketResult.getLeft()); + if (!bucketResult.getRight()) { LOG.error("processWithLegacy failed."); + anyFailure = true; } - success = nsSummaryTaskWithOBS.processWithOBS(events); - if (!success) { + + // Process OBS bucket + bucketSeek = subTaskSeekPosMap.getOrDefault(BucketType.OBS.name(), 0); + bucketResult = nsSummaryTaskWithOBS.processWithOBS(events, bucketSeek); + updatedSeekPositions.put(BucketType.OBS.name(), bucketResult.getLeft()); + if (!bucketResult.getRight()) { LOG.error("processWithOBS failed."); + anyFailure = true; } - LOG.debug("{} successfully processed in {} milliseconds", - getTaskName(), (System.currentTimeMillis() - startTime)); - return new ImmutablePair<>(getTaskName(), success); + + // Return task failure if any bucket failed, while keeping each bucket's latest seek position + return new TaskResult.Builder() + .setTaskName(getTaskName()) + .setSubTaskSeekPositions(updatedSeekPositions) + .setTaskSuccess(!anyFailure) + .build(); } @Override - public Pair reprocess(OMMetadataManager omMetadataManager) { + public TaskResult reprocess(OMMetadataManager omMetadataManager) { // Initialize a list of tasks to run in parallel Collection> tasks = new ArrayList<>(); @@ -130,7 +177,7 @@ public Pair reprocess(OMMetadataManager omMetadataManager) { } catch (IOException ioEx) { LOG.error("Unable to clear NSSummary table in Recon DB. ", ioEx); - return new ImmutablePair<>(getTaskName(), false); + return buildTaskResult(false); } tasks.add(() -> nsSummaryTaskWithFSO @@ -150,15 +197,12 @@ public Pair reprocess(OMMetadataManager omMetadataManager) { results = executorService.invokeAll(tasks); for (int i = 0; i < results.size(); i++) { if (results.get(i).get().equals(false)) { - return new ImmutablePair<>(getTaskName(), false); + return buildTaskResult(false); } } - } catch (InterruptedException ex) { + } catch (InterruptedException | ExecutionException ex) { LOG.error("Error while reprocessing NSSummary table in Recon DB.", ex); - return new ImmutablePair<>(getTaskName(), false); - } catch (ExecutionException ex2) { - LOG.error("Error while reprocessing NSSummary table in Recon DB.", ex2); - return new ImmutablePair<>(getTaskName(), false); + return buildTaskResult(false); } finally { executorService.shutdown(); @@ -171,7 +215,7 @@ public Pair reprocess(OMMetadataManager omMetadataManager) { LOG.debug("Task execution time: {} milliseconds", durationInMillis); } - return new ImmutablePair<>(getTaskName(), true); + return buildTaskResult(true); } } diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/NSSummaryTaskDbEventHandler.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/NSSummaryTaskDbEventHandler.java index da223665c381..4b0b8514904d 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/NSSummaryTaskDbEventHandler.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/NSSummaryTaskDbEventHandler.java @@ -71,15 +71,16 @@ public ReconOMMetadataManager getReconOMMetadataManager() { protected void writeNSSummariesToDB(Map nsSummaryMap) throws IOException { try (RDBBatchOperation rdbBatchOperation = new RDBBatchOperation()) { - nsSummaryMap.keySet().forEach((Long key) -> { + for (Map.Entry entry : nsSummaryMap.entrySet()) { try { reconNamespaceSummaryManager.batchStoreNSSummaries(rdbBatchOperation, - key, nsSummaryMap.get(key)); + entry.getKey(), entry.getValue()); } catch (IOException e) { LOG.error("Unable to write Namespace Summary data in Recon DB.", e); + throw e; } - }); + } reconNamespaceSummaryManager.commitBatchOperation(rdbBatchOperation); } } @@ -201,20 +202,11 @@ protected void handleDeleteDirEvent(OmDirectoryInfo directoryInfo, protected boolean flushAndCommitNSToDB(Map nsSummaryMap) { try { writeNSSummariesToDB(nsSummaryMap); - nsSummaryMap.clear(); } catch (IOException e) { LOG.error("Unable to write Namespace Summary data in Recon DB.", e); return false; - } - return true; - } - - protected boolean checkAndCallFlushToDB( - Map nsSummaryMap) { - // if map contains more than entries, flush to DB and clear the map - if (null != nsSummaryMap && nsSummaryMap.size() >= - nsSummaryFlushToDBMaxThreshold) { - return flushAndCommitNSToDB(nsSummaryMap); + } finally { + nsSummaryMap.clear(); } return true; } diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/NSSummaryTaskWithFSO.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/NSSummaryTaskWithFSO.java index 030574e0d6dd..6ebc36331a5e 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/NSSummaryTaskWithFSO.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/NSSummaryTaskWithFSO.java @@ -26,6 +26,8 @@ import java.util.HashMap; import java.util.Iterator; import java.util.Map; +import org.apache.commons.lang3.tuple.ImmutablePair; +import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.utils.db.Table; import org.apache.hadoop.hdds.utils.db.TableIterator; @@ -47,14 +49,18 @@ public class NSSummaryTaskWithFSO extends NSSummaryTaskDbEventHandler { private static final Logger LOG = LoggerFactory.getLogger(NSSummaryTaskWithFSO.class); + private final long nsSummaryFlushToDBMaxThreshold; + public NSSummaryTaskWithFSO(ReconNamespaceSummaryManager reconNamespaceSummaryManager, ReconOMMetadataManager reconOMMetadataManager, OzoneConfiguration - ozoneConfiguration) { + ozoneConfiguration, + long nsSummaryFlushToDBMaxThreshold) { super(reconNamespaceSummaryManager, reconOMMetadataManager, ozoneConfiguration); + this.nsSummaryFlushToDBMaxThreshold = nsSummaryFlushToDBMaxThreshold; } // We only listen to updates from FSO-enabled KeyTable(FileTable) and DirTable @@ -62,15 +68,23 @@ public Collection getTaskTables() { return Arrays.asList(FILE_TABLE, DIRECTORY_TABLE); } - public boolean processWithFSO(OMUpdateEventBatch events) { + public Pair processWithFSO(OMUpdateEventBatch events, + int seekPos) { Iterator eventIterator = events.getIterator(); + int itrPos = 0; + while (eventIterator.hasNext() && itrPos < seekPos) { + eventIterator.next(); + itrPos++; + } final Collection taskTables = getTaskTables(); Map nsSummaryMap = new HashMap<>(); + int eventCounter = 0; while (eventIterator.hasNext()) { OMDBUpdateEvent omdbUpdateEvent = eventIterator.next(); OMDBUpdateEvent.OMDBUpdateAction action = omdbUpdateEvent.getAction(); + eventCounter++; // we only process updates on OM's FileTable and Dirtable String table = omdbUpdateEvent.getTable(); @@ -149,20 +163,23 @@ public boolean processWithFSO(OMUpdateEventBatch events) { } catch (IOException ioEx) { LOG.error("Unable to process Namespace Summary data in Recon DB. ", ioEx); - return false; + nsSummaryMap.clear(); + return new ImmutablePair<>(seekPos, false); } - if (!checkAndCallFlushToDB(nsSummaryMap)) { - return false; + if (nsSummaryMap.size() >= nsSummaryFlushToDBMaxThreshold) { + if (!flushAndCommitNSToDB(nsSummaryMap)) { + return new ImmutablePair<>(seekPos, false); + } + seekPos = eventCounter + 1; } } // flush and commit left out entries at end if (!flushAndCommitNSToDB(nsSummaryMap)) { - return false; + return new ImmutablePair<>(seekPos, false); } - LOG.debug("Completed a process run of NSSummaryTaskWithFSO"); - return true; + return new ImmutablePair<>(seekPos, true); } public boolean reprocessWithFSO(OMMetadataManager omMetadataManager) { @@ -178,8 +195,10 @@ public boolean reprocessWithFSO(OMMetadataManager omMetadataManager) { Table.KeyValue kv = dirTableIter.next(); OmDirectoryInfo directoryInfo = kv.getValue(); handlePutDirEvent(directoryInfo, nsSummaryMap); - if (!checkAndCallFlushToDB(nsSummaryMap)) { - return false; + if (nsSummaryMap.size() >= nsSummaryFlushToDBMaxThreshold) { + if (!flushAndCommitNSToDB(nsSummaryMap)) { + return false; + } } } } @@ -194,8 +213,10 @@ public boolean reprocessWithFSO(OMMetadataManager omMetadataManager) { Table.KeyValue kv = keyTableIter.next(); OmKeyInfo keyInfo = kv.getValue(); handlePutKeyEvent(keyInfo, nsSummaryMap); - if (!checkAndCallFlushToDB(nsSummaryMap)) { - return false; + if (nsSummaryMap.size() >= nsSummaryFlushToDBMaxThreshold) { + if (!flushAndCommitNSToDB(nsSummaryMap)) { + return false; + } } } } diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/NSSummaryTaskWithLegacy.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/NSSummaryTaskWithLegacy.java index cf29f23813a4..a14600391765 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/NSSummaryTaskWithLegacy.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/NSSummaryTaskWithLegacy.java @@ -25,6 +25,8 @@ import java.util.HashMap; import java.util.Iterator; import java.util.Map; +import org.apache.commons.lang3.tuple.ImmutablePair; +import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.utils.db.Table; import org.apache.hadoop.hdds.utils.db.TableIterator; @@ -51,24 +53,35 @@ public class NSSummaryTaskWithLegacy extends NSSummaryTaskDbEventHandler { private static final Logger LOG = LoggerFactory.getLogger(NSSummaryTaskWithLegacy.class); - private boolean enableFileSystemPaths; + private final boolean enableFileSystemPaths; + private final long nsSummaryFlushToDBMaxThreshold; public NSSummaryTaskWithLegacy(ReconNamespaceSummaryManager reconNamespaceSummaryManager, ReconOMMetadataManager reconOMMetadataManager, OzoneConfiguration - ozoneConfiguration) { + ozoneConfiguration, + long nsSummaryFlushToDBMaxThreshold) { super(reconNamespaceSummaryManager, reconOMMetadataManager, ozoneConfiguration); // true if FileSystemPaths enabled enableFileSystemPaths = ozoneConfiguration .getBoolean(OmConfig.Keys.ENABLE_FILESYSTEM_PATHS, OmConfig.Defaults.ENABLE_FILESYSTEM_PATHS); + this.nsSummaryFlushToDBMaxThreshold = nsSummaryFlushToDBMaxThreshold; } - public boolean processWithLegacy(OMUpdateEventBatch events) { + public Pair processWithLegacy(OMUpdateEventBatch events, + int seekPos) { Iterator eventIterator = events.getIterator(); + int itrPos = 0; + while (eventIterator.hasNext() && itrPos < seekPos) { + eventIterator.next(); + itrPos++; + } + + int eventCounter = 0; Map nsSummaryMap = new HashMap<>(); ReconOMMetadataManager metadataManager = getReconOMMetadataManager(); @@ -76,6 +89,7 @@ public boolean processWithLegacy(OMUpdateEventBatch events) { OMDBUpdateEvent omdbUpdateEvent = eventIterator.next(); OMDBUpdateEvent.OMDBUpdateAction action = omdbUpdateEvent.getAction(); + eventCounter++; // we only process updates on OM's KeyTable String table = omdbUpdateEvent.getTable(); @@ -114,20 +128,24 @@ public boolean processWithLegacy(OMUpdateEventBatch events) { } catch (IOException ioEx) { LOG.error("Unable to process Namespace Summary data in Recon DB. ", ioEx); - return false; + nsSummaryMap.clear(); + return new ImmutablePair<>(seekPos, false); } - if (!checkAndCallFlushToDB(nsSummaryMap)) { - return false; + if (nsSummaryMap.size() >= nsSummaryFlushToDBMaxThreshold) { + if (!flushAndCommitNSToDB(nsSummaryMap)) { + return new ImmutablePair<>(seekPos, false); + } + seekPos = eventCounter + 1; } } // flush and commit left out entries at end if (!flushAndCommitNSToDB(nsSummaryMap)) { - return false; + return new ImmutablePair<>(seekPos, false); } LOG.debug("Completed a process run of NSSummaryTaskWithLegacy"); - return true; + return new ImmutablePair<>(seekPos, true); } private void processWithFileSystemLayout(OmKeyInfo updatedKeyInfo, @@ -278,14 +296,17 @@ public boolean reprocessWithLegacy(OMMetadataManager omMetadataManager) { setParentBucketId(keyInfo); handlePutKeyEvent(keyInfo, nsSummaryMap); } - if (!checkAndCallFlushToDB(nsSummaryMap)) { - return false; + if (nsSummaryMap.size() >= nsSummaryFlushToDBMaxThreshold) { + if (!flushAndCommitNSToDB(nsSummaryMap)) { + return false; + } } } } } catch (IOException ioEx) { LOG.error("Unable to reprocess Namespace Summary data in Recon DB. ", ioEx); + nsSummaryMap.clear(); return false; } diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/NSSummaryTaskWithOBS.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/NSSummaryTaskWithOBS.java index 7364639d47ab..e15cc2836fb2 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/NSSummaryTaskWithOBS.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/NSSummaryTaskWithOBS.java @@ -23,6 +23,8 @@ import java.util.HashMap; import java.util.Iterator; import java.util.Map; +import org.apache.commons.lang3.tuple.ImmutablePair; +import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.utils.db.Table; import org.apache.hadoop.hdds.utils.db.TableIterator; @@ -47,13 +49,17 @@ public class NSSummaryTaskWithOBS extends NSSummaryTaskDbEventHandler { private static final Logger LOG = LoggerFactory.getLogger(NSSummaryTaskWithOBS.class); + private final long nsSummaryFlushToDBMaxThreshold; + public NSSummaryTaskWithOBS( ReconNamespaceSummaryManager reconNamespaceSummaryManager, ReconOMMetadataManager reconOMMetadataManager, - OzoneConfiguration ozoneConfiguration) { + OzoneConfiguration ozoneConfiguration, + long nsSummaryFlushToDBMaxThreshold) { super(reconNamespaceSummaryManager, reconOMMetadataManager, ozoneConfiguration); + this.nsSummaryFlushToDBMaxThreshold = nsSummaryFlushToDBMaxThreshold; } @@ -89,14 +95,17 @@ public boolean reprocessWithOBS(OMMetadataManager omMetadataManager) { setKeyParentID(keyInfo); handlePutKeyEvent(keyInfo, nsSummaryMap); - if (!checkAndCallFlushToDB(nsSummaryMap)) { - return false; + if (nsSummaryMap.size() >= nsSummaryFlushToDBMaxThreshold) { + if (!flushAndCommitNSToDB(nsSummaryMap)) { + return false; + } } } } } catch (IOException ioEx) { LOG.error("Unable to reprocess Namespace Summary data in Recon DB. ", ioEx); + nsSummaryMap.clear(); return false; } @@ -108,14 +117,23 @@ public boolean reprocessWithOBS(OMMetadataManager omMetadataManager) { return true; } - public boolean processWithOBS(OMUpdateEventBatch events) { + public Pair processWithOBS(OMUpdateEventBatch events, + int seekPos) { Iterator eventIterator = events.getIterator(); Map nsSummaryMap = new HashMap<>(); + int itrPos = 0; + while (eventIterator.hasNext() && itrPos < seekPos) { + eventIterator.next(); + itrPos++; + } + + int eventCounter = 0; while (eventIterator.hasNext()) { OMDBUpdateEvent omdbUpdateEvent = eventIterator.next(); OMDBUpdateEvent.OMDBUpdateAction action = omdbUpdateEvent.getAction(); + eventCounter++; // We only process updates on OM's KeyTable String table = omdbUpdateEvent.getTable(); @@ -181,27 +199,27 @@ public boolean processWithOBS(OMUpdateEventBatch events) { default: LOG.debug("Skipping DB update event: {}", action); } - - if (!checkAndCallFlushToDB(nsSummaryMap)) { - return false; + if (nsSummaryMap.size() >= nsSummaryFlushToDBMaxThreshold) { + if (!flushAndCommitNSToDB(nsSummaryMap)) { + return new ImmutablePair<>(seekPos, false); + } + seekPos = eventCounter + 1; } } catch (IOException ioEx) { LOG.error("Unable to process Namespace Summary data in Recon DB. ", ioEx); - return false; - } - if (!checkAndCallFlushToDB(nsSummaryMap)) { - return false; + nsSummaryMap.clear(); + return new ImmutablePair<>(seekPos, false); } } // Flush and commit left-out entries at the end if (!flushAndCommitNSToDB(nsSummaryMap)) { - return false; + return new ImmutablePair<>(seekPos, false); } LOG.debug("Completed a process run of NSSummaryTaskWithOBS"); - return true; + return new ImmutablePair<>(seekPos, true); } diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/OmTableInsightTask.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/OmTableInsightTask.java index 72d5906b9628..6019990158bf 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/OmTableInsightTask.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/OmTableInsightTask.java @@ -36,8 +36,6 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; -import org.apache.commons.lang3.tuple.ImmutablePair; -import org.apache.commons.lang3.tuple.Pair; import org.apache.commons.lang3.tuple.Triple; import org.apache.hadoop.hdds.utils.db.Table; import org.apache.hadoop.hdds.utils.db.TableIterator; @@ -99,7 +97,7 @@ public void init() { /** * Iterates the rows of each table in the OM snapshot DB and calculates the * counts and sizes for table data. - * + *

* For tables that require data size calculation * (as returned by getTablesToCalculateSize), both the number of * records (count) and total data size of the records are calculated. @@ -109,7 +107,7 @@ public void init() { * @return Pair */ @Override - public Pair reprocess(OMMetadataManager omMetadataManager) { + public TaskResult reprocess(OMMetadataManager omMetadataManager) { init(); for (String tableName : tables) { Table table = omMetadataManager.getTable(tableName); @@ -131,7 +129,7 @@ public Pair reprocess(OMMetadataManager omMetadataManager) { } } catch (IOException ioEx) { LOG.error("Unable to populate Table Count in Recon DB.", ioEx); - return new ImmutablePair<>(getTaskName(), false); + return buildTaskResult(false); } } // Write the data to the DB @@ -146,7 +144,7 @@ public Pair reprocess(OMMetadataManager omMetadataManager) { } LOG.debug("Completed a 'reprocess' run of OmTableInsightTask."); - return new ImmutablePair<>(getTaskName(), true); + return buildTaskResult(true); } @Override @@ -162,11 +160,13 @@ public Collection getTaskTables() { * Read the update events and update the count and sizes of respective object * (volume, bucket, key etc.) based on the action (put or delete). * - * @param events Update events - PUT, DELETE and UPDATE. + * @param events Update events - PUT, DELETE and UPDATE. + * @param subTaskSeekPosMap * @return Pair */ @Override - public Pair process(OMUpdateEventBatch events) { + public TaskResult process(OMUpdateEventBatch events, + Map subTaskSeekPosMap) { Iterator eventIterator = events.getIterator(); String tableName; @@ -201,7 +201,7 @@ public Pair process(OMUpdateEventBatch events) { LOG.error( "Unexpected exception while processing the table {}, Action: {}", tableName, omdbUpdateEvent.getAction(), e); - return new ImmutablePair<>(getTaskName(), false); + return buildTaskResult(false); } } // Write the updated count and size information to the database @@ -216,7 +216,7 @@ public Pair process(OMUpdateEventBatch events) { } LOG.debug("{} successfully processed in {} milliseconds", getTaskName(), (System.currentTimeMillis() - startTime)); - return new ImmutablePair<>(getTaskName(), true); + return buildTaskResult(true); } private void handlePutEvent(OMDBUpdateEvent event, diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconOmTask.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconOmTask.java index 01079d529b9f..395fdf6b1e00 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconOmTask.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconOmTask.java @@ -17,7 +17,8 @@ package org.apache.hadoop.ozone.recon.tasks; -import org.apache.commons.lang3.tuple.Pair; +import java.util.Collections; +import java.util.Map; import org.apache.hadoop.ozone.om.OMMetadataManager; /** @@ -38,16 +39,109 @@ default void init() { } /** * Process a set of OM events on tables that the task is listening on. - * @param events Set of events to be processed by the task. - * @return Pair of task name -> task success. + * + * @param events The batch of OM update events to be processed. + * @param subTaskSeekPosMap A map containing the seek positions for + * each sub-task, indicating where processing should start. + * @return A {@link TaskResult} containing: + * - The task name. + * - A map of sub-task names to their respective seek positions. + * - A boolean indicating whether the task was successful. */ - Pair process(OMUpdateEventBatch events); + TaskResult process(OMUpdateEventBatch events, + Map subTaskSeekPosMap); /** - * Process a on tables that the task is listening on. - * @param omMetadataManager OM Metadata manager instance. - * @return Pair of task name -> task success. + * Reprocesses full entries in Recon OM RocksDB tables that the task is listening to. + * + * @param omMetadataManager The OM Metadata Manager instance used for accessing metadata. + * @return A {@link TaskResult} containing: + * - The task name. + * - A map of sub-task names to their respective seek positions. + * - A boolean indicating whether the task was successful. */ - Pair reprocess(OMMetadataManager omMetadataManager); + TaskResult reprocess(OMMetadataManager omMetadataManager); + /** + * Represents the result of a task execution, including the task name, + * sub-task seek positions, and success status. + * + *

This class is immutable and uses the Builder pattern for object creation.

+ */ + class TaskResult { + private final String taskName; + private final Map subTaskSeekPositions; + private final boolean taskSuccess; + + /** + * Private constructor to enforce the use of the {@link Builder}. + * + * @param builder The builder instance containing values for initialization. + */ + private TaskResult(Builder builder) { + this.taskName = builder.taskName; + this.subTaskSeekPositions = builder.subTaskSeekPositions != null + ? builder.subTaskSeekPositions + : Collections.emptyMap(); // Default value + this.taskSuccess = builder.taskSuccess; + } + + // Getters + public String getTaskName() { + return taskName; + } + + public Map getSubTaskSeekPositions() { + return subTaskSeekPositions; + } + + public boolean isTaskSuccess() { + return taskSuccess; + } + + /** + * Builder class for creating instances of {@link TaskResult}. + */ + public static class Builder { + private String taskName; + private Map subTaskSeekPositions = Collections.emptyMap(); // Default value + private boolean taskSuccess; + + public Builder setTaskName(String taskName) { + this.taskName = taskName; + return this; + } + + public Builder setSubTaskSeekPositions(Map subTaskSeekPositions) { + this.subTaskSeekPositions = subTaskSeekPositions; + return this; + } + + public Builder setTaskSuccess(boolean taskSuccess) { + this.taskSuccess = taskSuccess; + return this; + } + + public TaskResult build() { + return new TaskResult(this); + } + } + + // toString Method for debugging + @Override + public String toString() { + return "TaskResult{" + + "taskName='" + taskName + '\'' + + ", subTaskSeekPositions=" + subTaskSeekPositions + + ", taskSuccess=" + taskSuccess + + '}'; + } + } + + default TaskResult buildTaskResult(boolean success) { + return new TaskResult.Builder() + .setTaskName(getTaskName()) + .setTaskSuccess(success) + .build(); + } } diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconTaskControllerImpl.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconTaskControllerImpl.java index c1d786db1a9d..e289b6ae15b8 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconTaskControllerImpl.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconTaskControllerImpl.java @@ -24,6 +24,7 @@ import com.google.inject.Inject; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -36,7 +37,6 @@ import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; -import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.ozone.om.OMMetadataManager; import org.apache.hadoop.ozone.recon.ReconConstants; @@ -97,27 +97,27 @@ public void registerTask(ReconOmTask task) { @Override public synchronized void consumeOMEvents(OMUpdateEventBatch events, OMMetadataManager omMetadataManager) { if (!events.isEmpty()) { - Collection>> tasks = new ArrayList<>(); - List failedTasks = new ArrayList<>(); + Collection> tasks = new ArrayList<>(); + List failedTasks = new ArrayList<>(); for (Map.Entry taskEntry : reconOmTasks.entrySet()) { ReconOmTask task = taskEntry.getValue(); ReconTaskStatusUpdater taskStatusUpdater = taskStatusUpdaterManager.getTaskStatusUpdater(task.getTaskName()); taskStatusUpdater.recordRunStart(); // events passed to process method is no longer filtered - tasks.add(new NamedCallableTask<>(task.getTaskName(), () -> task.process(events))); + tasks.add(new NamedCallableTask<>(task.getTaskName(), () -> task.process(events, Collections.emptyMap()))); } processTasks(tasks, events, failedTasks); // Retry processing failed tasks - List retryFailedTasks = new ArrayList<>(); + List retryFailedTasks = new ArrayList<>(); if (!failedTasks.isEmpty()) { tasks.clear(); - for (String taskName : failedTasks) { - ReconOmTask task = reconOmTasks.get(taskName); + for (ReconOmTask.TaskResult taskResult : failedTasks) { + ReconOmTask task = reconOmTasks.get(taskResult.getTaskName()); // events passed to process method is no longer filtered tasks.add(new NamedCallableTask<>(task.getTaskName(), - () -> task.process(events))); + () -> task.process(events, taskResult.getSubTaskSeekPositions()))); } processTasks(tasks, events, retryFailedTasks); } @@ -126,12 +126,15 @@ public synchronized void consumeOMEvents(OMUpdateEventBatch events, OMMetadataMa ReconConstants.resetTableTruncatedFlags(); if (!retryFailedTasks.isEmpty()) { tasks.clear(); - for (String taskName : failedTasks) { - ReconOmTask task = reconOmTasks.get(taskName); + for (ReconOmTask.TaskResult taskResult : failedTasks) { + ReconOmTask task = reconOmTasks.get(taskResult.getTaskName()); tasks.add(new NamedCallableTask<>(task.getTaskName(), () -> task.reprocess(omMetadataManager))); } - List reprocessFailedTasks = new ArrayList<>(); + List reprocessFailedTasks = new ArrayList<>(); processTasks(tasks, events, reprocessFailedTasks); + // Here the assumption is that even if full re-process of task also fails, + // then there is something wrong in recon rocks DB got from OM and needs to be + // investigated. ignoreFailedTasks(reprocessFailedTasks); } } @@ -141,8 +144,9 @@ public synchronized void consumeOMEvents(OMUpdateEventBatch events, OMMetadataMa * Ignore tasks that failed reprocess step more than threshold times. * @param failedTasks list of failed tasks. */ - private void ignoreFailedTasks(List failedTasks) { - for (String taskName : failedTasks) { + private void ignoreFailedTasks(List failedTasks) { + for (ReconOmTask.TaskResult taskResult : failedTasks) { + String taskName = taskResult.getTaskName(); LOG.info("Reprocess step failed for task {}.", taskName); if (taskFailureCounter.get(taskName).incrementAndGet() > TASK_FAILURE_THRESHOLD) { @@ -155,14 +159,16 @@ private void ignoreFailedTasks(List failedTasks) { @Override public synchronized void reInitializeTasks(ReconOMMetadataManager omMetadataManager) { - Collection>> tasks = new ArrayList<>(); + Collection> tasks = new ArrayList<>(); ReconConstants.resetTableTruncatedFlags(); for (Map.Entry taskEntry : reconOmTasks.entrySet()) { ReconOmTask task = taskEntry.getValue(); - ReconTaskStatusUpdater taskStatusUpdater = taskStatusUpdaterManager.getTaskStatusUpdater(task.getTaskName()); + ReconTaskStatusUpdater taskStatusUpdater = + taskStatusUpdaterManager.getTaskStatusUpdater(task.getTaskName()); taskStatusUpdater.recordRunStart(); - tasks.add(new NamedCallableTask<>(task.getTaskName(), () -> task.reprocess(omMetadataManager))); + tasks.add(new NamedCallableTask<>(task.getTaskName(), + () -> task.reprocess(omMetadataManager))); } try { @@ -178,14 +184,16 @@ public synchronized void reInitializeTasks(ReconOMMetadataManager omMetadataMana throw new TaskExecutionException(task.getTaskName(), e); } }, executorService).thenAccept(result -> { - String taskName = result.getLeft(); - ReconTaskStatusUpdater taskStatusUpdater = taskStatusUpdaterManager.getTaskStatusUpdater(taskName); - if (!result.getRight()) { + String taskName = result.getTaskName(); + ReconTaskStatusUpdater taskStatusUpdater = + taskStatusUpdaterManager.getTaskStatusUpdater(taskName); + if (!result.isTaskSuccess()) { LOG.error("Init failed for task {}.", taskName); taskStatusUpdater.setLastTaskRunStatus(-1); } else { taskStatusUpdater.setLastTaskRunStatus(0); - taskStatusUpdater.setLastUpdatedSeqNumber(omMetadataManager.getLastSequenceNumberFromDB()); + taskStatusUpdater.setLastUpdatedSeqNumber( + omMetadataManager.getLastSequenceNumberFromDB()); } taskStatusUpdater.recordRunCompletion(); }).exceptionally(ex -> { @@ -237,8 +245,9 @@ public synchronized void stop() { * @param events A batch of {@link OMUpdateEventBatch} events to fetch sequence number of last event in batch. * @param failedTasks Reference of the list to which we want to add the failed tasks for retry/reprocessing */ - private void processTasks(Collection>> tasks, - OMUpdateEventBatch events, List failedTasks) { + private void processTasks( + Collection> tasks, + OMUpdateEventBatch events, List failedTasks) { List> futures = tasks.stream() .map(task -> CompletableFuture.supplyAsync(() -> { try { @@ -251,11 +260,15 @@ private void processTasks(Collection>> t throw new TaskExecutionException(task.getTaskName(), e); } }, executorService).thenAccept(result -> { - String taskName = result.getLeft(); - ReconTaskStatusUpdater taskStatusUpdater = taskStatusUpdaterManager.getTaskStatusUpdater(taskName); - if (!result.getRight()) { + String taskName = result.getTaskName(); + ReconTaskStatusUpdater taskStatusUpdater = + taskStatusUpdaterManager.getTaskStatusUpdater(taskName); + if (!result.isTaskSuccess()) { LOG.error("Task {} failed", taskName); - failedTasks.add(result.getLeft()); + failedTasks.add(new ReconOmTask.TaskResult.Builder() + .setTaskName(taskName) + .setSubTaskSeekPositions(result.getSubTaskSeekPositions()) + .build()); taskStatusUpdater.setLastTaskRunStatus(-1); } else { taskFailureCounter.get(taskName).set(0); @@ -270,7 +283,8 @@ private void processTasks(Collection>> t String taskName = taskEx.getTaskName(); LOG.error("The above error occurred while trying to execute task: {}", taskName); - ReconTaskStatusUpdater taskStatusUpdater = taskStatusUpdaterManager.getTaskStatusUpdater(taskName); + ReconTaskStatusUpdater taskStatusUpdater = + taskStatusUpdaterManager.getTaskStatusUpdater(taskName); taskStatusUpdater.setLastTaskRunStatus(-1); taskStatusUpdater.recordRunCompletion(); } diff --git a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestContainerEndpoint.java b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestContainerEndpoint.java index 9efd3c9e995b..a81c5d183374 100644 --- a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestContainerEndpoint.java +++ b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestContainerEndpoint.java @@ -477,7 +477,7 @@ public void testGetKeysForContainer() throws IOException { setUpFSOData(); NSSummaryTaskWithFSO nSSummaryTaskWithFso = new NSSummaryTaskWithFSO(reconNamespaceSummaryManager, - reconOMMetadataManager, new OzoneConfiguration()); + reconOMMetadataManager, new OzoneConfiguration(), 10); nSSummaryTaskWithFso.reprocessWithFSO(reconOMMetadataManager); // Reprocess the container key mapper to ensure the latest mapping is used reprocessContainerKeyMapper(); @@ -565,7 +565,7 @@ public void testGetKeysForContainerWithPrevKey() throws IOException { reprocessContainerKeyMapper(); NSSummaryTaskWithFSO nSSummaryTaskWithFso = new NSSummaryTaskWithFSO(reconNamespaceSummaryManager, - reconOMMetadataManager, new OzoneConfiguration()); + reconOMMetadataManager, new OzoneConfiguration(), 10); nSSummaryTaskWithFso.reprocessWithFSO(reconOMMetadataManager); response = containerEndpoint.getKeysForContainer(20L, -1, "/0/1/2/file7"); diff --git a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestEndpoints.java b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestEndpoints.java index 52ac1d64f4b1..9b16643d0f3f 100644 --- a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestEndpoints.java +++ b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestEndpoints.java @@ -66,7 +66,6 @@ import javax.ws.rs.core.Response; import javax.ws.rs.core.UriInfo; import org.apache.commons.io.FileUtils; -import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.hdds.client.RatisReplicationConfig; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; @@ -135,6 +134,7 @@ import org.apache.hadoop.ozone.recon.tasks.FileSizeCountTaskFSO; import org.apache.hadoop.ozone.recon.tasks.FileSizeCountTaskOBS; import org.apache.hadoop.ozone.recon.tasks.OmTableInsightTask; +import org.apache.hadoop.ozone.recon.tasks.ReconOmTask; import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer; import org.apache.ozone.test.LambdaTestUtils; import org.hadoop.ozone.recon.schema.UtilizationSchemaDefinition; @@ -789,9 +789,9 @@ public void testGetClusterState() throws Exception { }); omTableInsightTask.init(); // check volume, bucket and key count after running table count task - Pair result = + ReconOmTask.TaskResult result = omTableInsightTask.reprocess(reconOMMetadataManager); - assertTrue(result.getRight()); + assertTrue(result.isTaskSuccess()); response = clusterStateEndpoint.getClusterState(); clusterStateResponse = (ClusterStateResponse) response.getEntity(); assertEquals(2, clusterStateResponse.getVolumes()); @@ -869,10 +869,10 @@ public void testGetFileCounts() throws Exception { .thenReturn(omKeyInfo3); // Call reprocess on both endpoints. - Pair resultOBS = fileSizeCountTaskOBS.reprocess(omMetadataManager); - Pair resultFSO = fileSizeCountTaskFSO.reprocess(omMetadataManager); - assertTrue(resultOBS.getRight()); - assertTrue(resultFSO.getRight()); + ReconOmTask.TaskResult resultOBS = fileSizeCountTaskOBS.reprocess(omMetadataManager); + ReconOmTask.TaskResult resultFSO = fileSizeCountTaskFSO.reprocess(omMetadataManager); + assertTrue(resultOBS.isTaskSuccess()); + assertTrue(resultFSO.isTaskSuccess()); // The two tasks should result in 3 rows. assertEquals(3, fileCountBySizeDao.count()); diff --git a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestNSSummaryDiskUsageOrdering.java b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestNSSummaryDiskUsageOrdering.java index 810c9096c91f..84d0807a3404 100644 --- a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestNSSummaryDiskUsageOrdering.java +++ b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestNSSummaryDiskUsageOrdering.java @@ -106,7 +106,7 @@ public void setUp() throws Exception { populateOMDB(); NSSummaryTaskWithFSO nSSummaryTaskWithFso = new NSSummaryTaskWithFSO(reconNamespaceSummaryManager, - reconOMMetadataManager, ozoneConfiguration); + reconOMMetadataManager, ozoneConfiguration, 10); nSSummaryTaskWithFso.reprocessWithFSO(reconOMMetadataManager); } diff --git a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestNSSummaryEndpointWithFSO.java b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestNSSummaryEndpointWithFSO.java index edb495421f87..77d5bb69a57a 100644 --- a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestNSSummaryEndpointWithFSO.java +++ b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestNSSummaryEndpointWithFSO.java @@ -389,7 +389,7 @@ public void setUp() throws Exception { populateOMDB(); NSSummaryTaskWithFSO nSSummaryTaskWithFso = new NSSummaryTaskWithFSO(reconNamespaceSummaryManager, - reconOMMetadataManager, ozoneConfiguration); + reconOMMetadataManager, ozoneConfiguration, 10); nSSummaryTaskWithFso.reprocessWithFSO(reconOMMetadataManager); commonUtils = new CommonUtils(); } diff --git a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestNSSummaryEndpointWithLegacy.java b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestNSSummaryEndpointWithLegacy.java index 8dca6ed566ba..1a01e125435e 100644 --- a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestNSSummaryEndpointWithLegacy.java +++ b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestNSSummaryEndpointWithLegacy.java @@ -386,7 +386,7 @@ public void setUp() throws Exception { populateOMDB(); NSSummaryTaskWithLegacy nsSummaryTaskWithLegacy = new NSSummaryTaskWithLegacy(reconNamespaceSummaryManager, - reconOMMetadataManager, conf); + reconOMMetadataManager, conf, 10); nsSummaryTaskWithLegacy.reprocessWithLegacy(reconOMMetadataManager); commonUtils = new CommonUtils(); } diff --git a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestNSSummaryEndpointWithOBSAndLegacy.java b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestNSSummaryEndpointWithOBSAndLegacy.java index 7743b5aa3989..a162f48d8258 100644 --- a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestNSSummaryEndpointWithOBSAndLegacy.java +++ b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestNSSummaryEndpointWithOBSAndLegacy.java @@ -382,11 +382,11 @@ public void setUp() throws Exception { populateOMDB(); NSSummaryTaskWithOBS nsSummaryTaskWithOBS = new NSSummaryTaskWithOBS(reconNamespaceSummaryManager, - reconOMMetadataManager, conf); + reconOMMetadataManager, conf, 10); nsSummaryTaskWithOBS.reprocessWithOBS(reconOMMetadataManager); NSSummaryTaskWithLegacy nsSummaryTaskWithLegacy = new NSSummaryTaskWithLegacy(reconNamespaceSummaryManager, - reconOMMetadataManager, conf); + reconOMMetadataManager, conf, 10); nsSummaryTaskWithLegacy.reprocessWithLegacy(reconOMMetadataManager); commonUtils = new CommonUtils(); } diff --git a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestOmDBInsightEndPoint.java b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestOmDBInsightEndPoint.java index 065c5ac2cad3..26fb6fe21b2c 100644 --- a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestOmDBInsightEndPoint.java +++ b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestOmDBInsightEndPoint.java @@ -321,13 +321,13 @@ public void setUp() throws Exception { setUpOmData(); nSSummaryTaskWithLegacy = new NSSummaryTaskWithLegacy( reconNamespaceSummaryManager, - reconOMMetadataManager, ozoneConfiguration); + reconOMMetadataManager, ozoneConfiguration, 10); nsSummaryTaskWithOBS = new NSSummaryTaskWithOBS( reconNamespaceSummaryManager, - reconOMMetadataManager, ozoneConfiguration); + reconOMMetadataManager, ozoneConfiguration, 10); nsSummaryTaskWithFSO = new NSSummaryTaskWithFSO( reconNamespaceSummaryManager, - reconOMMetadataManager, ozoneConfiguration); + reconOMMetadataManager, ozoneConfiguration, 10); reconNamespaceSummaryManager.clearNSSummaryTable(); nSSummaryTaskWithLegacy.reprocessWithLegacy(reconOMMetadataManager); nsSummaryTaskWithOBS.reprocessWithOBS(reconOMMetadataManager); diff --git a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestOpenKeysSearchEndpoint.java b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestOpenKeysSearchEndpoint.java index ce84eec41844..5123619416bd 100644 --- a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestOpenKeysSearchEndpoint.java +++ b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestOpenKeysSearchEndpoint.java @@ -116,7 +116,7 @@ public void setUp() throws Exception { populateOMDB(); NSSummaryTaskWithFSO nSSummaryTaskWithFso = new NSSummaryTaskWithFSO(reconNamespaceSummaryManager, - reconOMMetadataManager, ozoneConfiguration); + reconOMMetadataManager, ozoneConfiguration, 10); nSSummaryTaskWithFso.reprocessWithFSO(reconOMMetadataManager); } diff --git a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/DummyReconDBTask.java b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/DummyReconDBTask.java index aec2b7d6e2d2..2fc1c320d18b 100644 --- a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/DummyReconDBTask.java +++ b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/DummyReconDBTask.java @@ -19,8 +19,7 @@ import java.util.Collection; import java.util.Collections; -import org.apache.commons.lang3.tuple.ImmutablePair; -import org.apache.commons.lang3.tuple.Pair; +import java.util.Map; import org.apache.hadoop.ozone.om.OMMetadataManager; /** @@ -52,20 +51,21 @@ public Collection getTaskTables() { } @Override - public Pair process(OMUpdateEventBatch events) { + public TaskResult process( + OMUpdateEventBatch events, Map seekPos) { if (++callCtr <= numFailuresAllowed) { - return new ImmutablePair<>(getTaskName(), false); + return buildTaskResult(false); } else { - return new ImmutablePair<>(getTaskName(), true); + return buildTaskResult(true); } } @Override - public Pair reprocess(OMMetadataManager omMetadataManager) { + public TaskResult reprocess(OMMetadataManager omMetadataManager) { if (++callCtr <= numFailuresAllowed) { - return new ImmutablePair<>(getTaskName(), false); + return buildTaskResult(false); } else { - return new ImmutablePair<>(getTaskName(), true); + return buildTaskResult(true); } } diff --git a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestContainerKeyMapperTask.java b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestContainerKeyMapperTask.java index b78d970bacfa..36b335c1b46b 100644 --- a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestContainerKeyMapperTask.java +++ b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestContainerKeyMapperTask.java @@ -336,7 +336,7 @@ public void testKeyTableProcess() throws IOException { assertEquals(1, reconContainerMetadataManager.getKeyCountForContainer(3L)); // Process PUT & DELETE event. - containerKeyMapperTask.process(omUpdateEventBatch); + containerKeyMapperTask.process(omUpdateEventBatch, Collections.emptyMap()); keyPrefixesForContainer = reconContainerMetadataManager .getKeyPrefixesForContainer(1); @@ -427,7 +427,7 @@ public void testFileTableProcess() throws Exception { }, 0L); // Process PUT event for both the keys - containerKeyMapperTask.process(omUpdateEventBatch); + containerKeyMapperTask.process(omUpdateEventBatch, Collections.emptyMap()); keyPrefixesForContainer = reconContainerMetadataManager .getKeyPrefixesForContainer(1); @@ -460,7 +460,7 @@ public void testFileTableProcess() throws Exception { }, 0L); // Process DELETE event for key2 - containerKeyMapperTask.process(omUpdateEventBatch2); + containerKeyMapperTask.process(omUpdateEventBatch2, Collections.emptyMap()); keyPrefixesForContainer = reconContainerMetadataManager .getKeyPrefixesForContainer(1); diff --git a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestFileSizeCountTask.java b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestFileSizeCountTask.java index 30ca22154bef..fc2f152ac016 100644 --- a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestFileSizeCountTask.java +++ b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestFileSizeCountTask.java @@ -33,8 +33,8 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.List; -import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.hdds.utils.db.TypedTable; import org.apache.hadoop.ozone.om.OMMetadataManager; import org.apache.hadoop.ozone.om.OmMetadataManagerImpl; @@ -130,12 +130,12 @@ public void testReprocess() throws IOException { fileCountBySizeDao.insert(new FileCountBySize("vol1", "bucket1", 1024L, 10L)); // Call reprocess on both tasks. - Pair resultOBS = fileSizeCountTaskOBS.reprocess(omMetadataManager); - Pair resultFSO = fileSizeCountTaskFSO.reprocess(omMetadataManager); + ReconOmTask.TaskResult resultOBS = fileSizeCountTaskOBS.reprocess(omMetadataManager); + ReconOmTask.TaskResult resultFSO = fileSizeCountTaskFSO.reprocess(omMetadataManager); // Verify that both tasks reported success. - assertTrue(resultOBS.getRight(), "OBS reprocess should return true"); - assertTrue(resultFSO.getRight(), "FSO reprocess should return true"); + assertTrue(resultOBS.isTaskSuccess(), "OBS reprocess should return true"); + assertTrue(resultFSO.isTaskSuccess(), "FSO reprocess should return true"); // After processing, there should be 3 rows (one per bin). assertEquals(3, fileCountBySizeDao.count(), "Expected 3 rows in the DB"); @@ -197,8 +197,8 @@ public void testProcess() { new OMUpdateEventBatch(Arrays.asList(event, event2), 0L); // Process the same batch on both endpoints. - fileSizeCountTaskOBS.process(omUpdateEventBatch); - fileSizeCountTaskFSO.process(omUpdateEventBatch); + fileSizeCountTaskOBS.process(omUpdateEventBatch, Collections.emptyMap()); + fileSizeCountTaskFSO.process(omUpdateEventBatch, Collections.emptyMap()); // After processing the first batch: // Since each endpoint processes the same events, the counts are doubled. @@ -256,8 +256,8 @@ public void testProcess() { omUpdateEventBatch = new OMUpdateEventBatch( Arrays.asList(updateEvent, putEvent, deleteEvent), 0L); - fileSizeCountTaskOBS.process(omUpdateEventBatch); - fileSizeCountTaskFSO.process(omUpdateEventBatch); + fileSizeCountTaskOBS.process(omUpdateEventBatch, Collections.emptyMap()); + fileSizeCountTaskFSO.process(omUpdateEventBatch, Collections.emptyMap()); assertEquals(4, fileCountBySizeDao.count()); recordToFind.value3(1024L); @@ -322,10 +322,10 @@ public void testReprocessAtScale() throws IOException { when(mockKeyValueFso.getValue()).thenAnswer(returnsElementsOf(omKeyInfoList)); // Call reprocess on both endpoints. - Pair resultOBS = fileSizeCountTaskOBS.reprocess(omMetadataManager); - Pair resultFSO = fileSizeCountTaskFSO.reprocess(omMetadataManager); - assertTrue(resultOBS.getRight()); - assertTrue(resultFSO.getRight()); + ReconOmTask.TaskResult resultOBS = fileSizeCountTaskOBS.reprocess(omMetadataManager); + ReconOmTask.TaskResult resultFSO = fileSizeCountTaskFSO.reprocess(omMetadataManager); + assertTrue(resultOBS.isTaskSuccess()); + assertTrue(resultFSO.isTaskSuccess()); // 2 volumes * 500 buckets * 42 bins = 42000 rows assertEquals(42000, fileCountBySizeDao.count()); @@ -393,8 +393,8 @@ public void testProcessAtScale() { OMUpdateEventBatch omUpdateEventBatch = new OMUpdateEventBatch(omDbEventList, 0L); // Process the same batch on both endpoints. - fileSizeCountTaskOBS.process(omUpdateEventBatch); - fileSizeCountTaskFSO.process(omUpdateEventBatch); + fileSizeCountTaskOBS.process(omUpdateEventBatch, Collections.emptyMap()); + fileSizeCountTaskFSO.process(omUpdateEventBatch, Collections.emptyMap()); // Verify 2 keys are in correct bins. assertEquals(10000, fileCountBySizeDao.count()); @@ -470,8 +470,8 @@ public void testProcessAtScale() { } omUpdateEventBatch = new OMUpdateEventBatch(omDbEventList, 0L); - fileSizeCountTaskOBS.process(omUpdateEventBatch); - fileSizeCountTaskFSO.process(omUpdateEventBatch); + fileSizeCountTaskOBS.process(omUpdateEventBatch, Collections.emptyMap()); + fileSizeCountTaskFSO.process(omUpdateEventBatch, Collections.emptyMap()); assertEquals(10000, fileCountBySizeDao.count()); recordToFind = dslContext diff --git a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestNSSummaryTask.java b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestNSSummaryTask.java index e3241287ea56..ea7efaddfdfc 100644 --- a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestNSSummaryTask.java +++ b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestNSSummaryTask.java @@ -29,6 +29,7 @@ import java.io.File; import java.io.IOException; import java.util.Arrays; +import java.util.Collections; import java.util.Set; import org.apache.hadoop.hdds.client.StandaloneReplicationConfig; import org.apache.hadoop.hdds.conf.OzoneConfiguration; @@ -220,7 +221,7 @@ public class TestProcess { @BeforeEach public void setUp() throws IOException { nSSummaryTask.reprocess(reconOMMetadataManager); - nSSummaryTask.process(processEventBatch()); + nSSummaryTask.process(processEventBatch(), Collections.emptyMap()); nsSummaryForBucket1 = reconNamespaceSummaryManager.getNSSummary(BUCKET_ONE_OBJECT_ID); diff --git a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestNSSummaryTaskWithFSO.java b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestNSSummaryTaskWithFSO.java index eb38084b4db7..83b722240510 100644 --- a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestNSSummaryTaskWithFSO.java +++ b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestNSSummaryTaskWithFSO.java @@ -18,6 +18,7 @@ package org.apache.hadoop.ozone.recon.tasks; import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX; +import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.FILE_TABLE; import static org.apache.hadoop.ozone.recon.OMMetadataManagerTestUtils.getMockOzoneManagerServiceProviderWithFSO; import static org.apache.hadoop.ozone.recon.OMMetadataManagerTestUtils.getTestReconOmMetadataManager; import static org.apache.hadoop.ozone.recon.OMMetadataManagerTestUtils.initializeNewOmMetadataManager; @@ -27,12 +28,16 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; +import static org.mockito.Mockito.mock; import java.io.File; import java.io.IOException; +import java.lang.reflect.Field; import java.util.Arrays; import java.util.HashSet; +import java.util.Iterator; import java.util.Set; +import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.hdds.client.StandaloneReplicationConfig; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; @@ -47,12 +52,14 @@ import org.apache.hadoop.ozone.recon.recovery.ReconOMMetadataManager; import org.apache.hadoop.ozone.recon.spi.OzoneManagerServiceProvider; import org.apache.hadoop.ozone.recon.spi.ReconNamespaceSummaryManager; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInstance; import org.junit.jupiter.api.io.TempDir; +import org.mockito.Mockito; /** * Test for NSSummaryTaskWithFSO. @@ -118,7 +125,7 @@ public class TestNSSummaryTaskWithFSO { void setUp(@TempDir File tmpDir) throws Exception { ozoneConfiguration = new OzoneConfiguration(); ozoneConfiguration.setLong(OZONE_RECON_NSSUMMARY_FLUSH_TO_DB_MAX_THRESHOLD, - 10); + 3); omMetadataManager = initializeNewOmMetadataManager(new File(tmpDir, "om")); OzoneManagerServiceProvider ozoneManagerServiceProvider = getMockOzoneManagerServiceProviderWithFSO(); @@ -141,9 +148,11 @@ void setUp(@TempDir File tmpDir) throws Exception { populateOMDB(); + long nsSummaryFlushToDBMaxThreshold = ozoneConfiguration.getLong( + OZONE_RECON_NSSUMMARY_FLUSH_TO_DB_MAX_THRESHOLD, 3); nSSummaryTaskWithFso = new NSSummaryTaskWithFSO( reconNamespaceSummaryManager, reconOMMetadataManager, - ozoneConfiguration); + ozoneConfiguration, nsSummaryFlushToDBMaxThreshold); } /** @@ -313,10 +322,12 @@ public class TestProcess { private OMDBUpdateEvent keyEvent6; private OMDBUpdateEvent keyEvent7; + private Pair result; + @BeforeEach public void setUp() throws IOException { nSSummaryTaskWithFso.reprocessWithFSO(reconOMMetadataManager); - nSSummaryTaskWithFso.processWithFSO(processEventBatch()); + result = nSSummaryTaskWithFso.processWithFSO(processEventBatch(), 0); } private OMUpdateEventBatch processEventBatch() throws IOException { @@ -511,6 +522,97 @@ public void testParentIdAfterProcessEventBatch() throws IOException { "DIR_FIVE's parent ID should match BUCKET_TWO_OBJECT_ID."); } + @Test + void testProcessWithFSOFlushAfterThresholdAndSuccess() throws IOException { + // Call the method under test + + // Assertions + Assertions.assertNotNull(result, "Result should not be null"); + // Why seekPos should be 7 ? because we have threshold value for flush is set as 3, + // and we have total 7 events, so nsSummaryMap will be flushed in 2 batches and + // during second batch flush, eventCounter will be 6, then last event7 alone will + // be flushed out of loop as remaining event. At every batch flush based on threshold, + // seekPos is set as equal to eventCounter + 1, so seekPos will be 7. + Assertions.assertEquals(7, result.getLeft(), "seekPos should be 7"); + Assertions.assertTrue(result.getRight(), "The processing should fail due to flush failure"); + } + + @Test + void testProcessWithFSOFlushAfterThresholdAndFailureOfLastElement() + throws NoSuchFieldException, IllegalAccessException { + // Assume the NamespaceSummaryTaskWithFSO object is already created + NSSummaryTaskWithFSO task = mock(NSSummaryTaskWithFSO.class); + + // Set the value of nsSummaryFlushToDBMaxThreshold to 3 using reflection + Field thresholdField = NSSummaryTaskWithFSO.class.getDeclaredField("nsSummaryFlushToDBMaxThreshold"); + thresholdField.setAccessible(true); + thresholdField.set(task, 3); + + ReconNamespaceSummaryManager mockReconNamespaceSummaryManager = mock(ReconNamespaceSummaryManager.class); + Field managerField = NSSummaryTaskDbEventHandler.class.getDeclaredField("reconNamespaceSummaryManager"); + managerField.setAccessible(true); + managerField.set(task, mockReconNamespaceSummaryManager); + + // Mock the OMUpdateEventBatch and its iterator + OMUpdateEventBatch events = mock(OMUpdateEventBatch.class); + Iterator mockIterator = mock(Iterator.class); + + Mockito.when(events.getIterator()).thenReturn(mockIterator); + + // Mock OMDBUpdateEvent objects and their behavior + OMDBUpdateEvent event1 = mock(OMDBUpdateEvent.class); + OMDBUpdateEvent event2 = mock(OMDBUpdateEvent.class); + OMDBUpdateEvent event3 = mock(OMDBUpdateEvent.class); + OMDBUpdateEvent event4 = mock(OMDBUpdateEvent.class); + + // Mock getAction() for each event + Mockito.when(event1.getAction()).thenReturn(OMDBUpdateEvent.OMDBUpdateAction.PUT); + Mockito.when(event2.getAction()).thenReturn(OMDBUpdateEvent.OMDBUpdateAction.PUT); + Mockito.when(event3.getAction()).thenReturn(OMDBUpdateEvent.OMDBUpdateAction.PUT); + Mockito.when(event4.getAction()).thenReturn(OMDBUpdateEvent.OMDBUpdateAction.PUT); + + OmKeyInfo keyInfo1 = new OmKeyInfo.Builder().setParentObjectID(1).setObjectID(2).setKeyName("key1") + .setBucketName("bucket1") + .setDataSize(1024).setVolumeName("volume1").build(); + OmKeyInfo keyInfo2 = new OmKeyInfo.Builder().setParentObjectID(1).setObjectID(3).setKeyName("key2") + .setBucketName("bucket1") + .setDataSize(1024).setVolumeName("volume1").build(); + OmKeyInfo keyInfo3 = new OmKeyInfo.Builder().setParentObjectID(1).setObjectID(3).setKeyName("key2") + .setBucketName("bucket1") + .setDataSize(1024).setVolumeName("volume1").build(); + OmKeyInfo keyInfo4 = new OmKeyInfo.Builder().setParentObjectID(1).setObjectID(3).setKeyName("key2") + .setBucketName("bucket1") + .setDataSize(1024).setVolumeName("volume1").build(); + Mockito.when(event1.getValue()).thenReturn(keyInfo1); + Mockito.when(event2.getValue()).thenReturn(keyInfo2); + Mockito.when(event3.getValue()).thenReturn(keyInfo3); + Mockito.when(event4.getValue()).thenReturn(keyInfo4); + + // Mock getTable() to return valid table name + Mockito.when(event1.getTable()).thenReturn(FILE_TABLE); + Mockito.when(event2.getTable()).thenReturn(FILE_TABLE); + Mockito.when(event3.getTable()).thenReturn(FILE_TABLE); + Mockito.when(event4.getTable()).thenReturn(FILE_TABLE); + + // Mock iterator to return the events + Mockito.when(mockIterator.hasNext()).thenReturn(true, true, true, true, false); + Mockito.when(mockIterator.next()).thenReturn(event1, event2, event3, event4); + + // Mock the flushAndCommitNSToDB method to fail on the last flush + NSSummaryTaskWithFSO taskSpy = Mockito.spy(task); + Mockito.doReturn(true).doReturn(true).doReturn(false).when(taskSpy).flushAndCommitNSToDB(Mockito.anyMap()); + + // Call the method under test + Pair result1 = taskSpy.processWithFSO(events, 0); + + // Assertions + Assertions.assertNotNull(result1, "Result should not be null"); + Assertions.assertEquals(0, result1.getLeft(), "seekPos should be 4"); + + // Verify interactions + Mockito.verify(mockIterator, Mockito.times(3)).next(); + Mockito.verify(taskSpy, Mockito.times(1)).flushAndCommitNSToDB(Mockito.anyMap()); + } } /** diff --git a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestNSSummaryTaskWithLegacy.java b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestNSSummaryTaskWithLegacy.java index ceb88a36563d..4d1f58e6712a 100644 --- a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestNSSummaryTaskWithLegacy.java +++ b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestNSSummaryTaskWithLegacy.java @@ -23,6 +23,7 @@ import static org.apache.hadoop.ozone.recon.OMMetadataManagerTestUtils.getTestReconOmMetadataManager; import static org.apache.hadoop.ozone.recon.OMMetadataManagerTestUtils.writeDirToOm; import static org.apache.hadoop.ozone.recon.OMMetadataManagerTestUtils.writeKeyToOm; +import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_NSSUMMARY_FLUSH_TO_DB_MAX_THRESHOLD; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; @@ -36,8 +37,8 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.utils.db.RDBBatchOperation; -import org.apache.hadoop.ozone.om.OMConfigKeys; import org.apache.hadoop.ozone.om.OMMetadataManager; +import org.apache.hadoop.ozone.om.OmConfig; import org.apache.hadoop.ozone.om.OmMetadataManagerImpl; import org.apache.hadoop.ozone.om.helpers.BucketLayout; import org.apache.hadoop.ozone.om.helpers.OmBucketInfo; @@ -142,9 +143,11 @@ void setUp(@TempDir File tmpDir) throws Exception { populateOMDB(); + long nsSummaryFlushToDBMaxThreshold = omConfiguration.getLong( + OZONE_RECON_NSSUMMARY_FLUSH_TO_DB_MAX_THRESHOLD, 10); nSSummaryTaskWithLegacy = new NSSummaryTaskWithLegacy( reconNamespaceSummaryManager, - reconOMMetadataManager, omConfiguration); + reconOMMetadataManager, omConfiguration, nsSummaryFlushToDBMaxThreshold); } /** @@ -292,7 +295,7 @@ public class TestProcess { @BeforeEach public void setUp() throws IOException { nSSummaryTaskWithLegacy.reprocessWithLegacy(reconOMMetadataManager); - nSSummaryTaskWithLegacy.processWithLegacy(processEventBatch()); + nSSummaryTaskWithLegacy.processWithLegacy(processEventBatch(), 0); nsSummaryForBucket1 = reconNamespaceSummaryManager.getNSSummary(BUCKET_ONE_OBJECT_ID); @@ -689,8 +692,8 @@ private void initializeNewOmMetadataManager( omConfiguration = new OzoneConfiguration(); omConfiguration.set(OZONE_OM_DB_DIRS, omDbDir.getAbsolutePath()); - omConfiguration.set(OMConfigKeys - .OZONE_OM_ENABLE_FILESYSTEM_PATHS, "true"); + omConfiguration.set(OmConfig.Keys.ENABLE_FILESYSTEM_PATHS, "true"); + omConfiguration.set(OZONE_RECON_NSSUMMARY_FLUSH_TO_DB_MAX_THRESHOLD, "10"); omMetadataManager = new OmMetadataManagerImpl( omConfiguration, null); diff --git a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestNSSummaryTaskWithLegacyOBSLayout.java b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestNSSummaryTaskWithLegacyOBSLayout.java index 766478b87167..48054d1eed39 100644 --- a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestNSSummaryTaskWithLegacyOBSLayout.java +++ b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestNSSummaryTaskWithLegacyOBSLayout.java @@ -22,6 +22,7 @@ import static org.apache.hadoop.ozone.recon.OMMetadataManagerTestUtils.getMockOzoneManagerServiceProviderWithFSO; import static org.apache.hadoop.ozone.recon.OMMetadataManagerTestUtils.getTestReconOmMetadataManager; import static org.apache.hadoop.ozone.recon.OMMetadataManagerTestUtils.writeKeyToOm; +import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_NSSUMMARY_FLUSH_TO_DB_MAX_THRESHOLD; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; @@ -115,6 +116,8 @@ void setUp(@TempDir File tmpDir) throws Exception { ozoneConfiguration = new OzoneConfiguration(); ozoneConfiguration.setBoolean(OMConfigKeys.OZONE_OM_ENABLE_FILESYSTEM_PATHS, false); + ozoneConfiguration.setLong(OZONE_RECON_NSSUMMARY_FLUSH_TO_DB_MAX_THRESHOLD, + 10); ReconTestInjector reconTestInjector = new ReconTestInjector.Builder(tmpDir) @@ -132,9 +135,12 @@ void setUp(@TempDir File tmpDir) throws Exception { populateOMDB(); + long nsSummaryFlushToDBMaxThreshold = ozoneConfiguration.getLong( + OZONE_RECON_NSSUMMARY_FLUSH_TO_DB_MAX_THRESHOLD, 10); nSSummaryTaskWithLegacy = new NSSummaryTaskWithLegacy( reconNamespaceSummaryManager, - reconOMMetadataManager, ozoneConfiguration); + reconOMMetadataManager, ozoneConfiguration, + nsSummaryFlushToDBMaxThreshold); } /** @@ -240,7 +246,7 @@ public void setUp() throws IOException { // reinit Recon RocksDB's namespace CF. reconNamespaceSummaryManager.clearNSSummaryTable(); nSSummaryTaskWithLegacy.reprocessWithLegacy(reconOMMetadataManager); - nSSummaryTaskWithLegacy.processWithLegacy(processEventBatch()); + nSSummaryTaskWithLegacy.processWithLegacy(processEventBatch(), 0); nsSummaryForBucket1 = reconNamespaceSummaryManager.getNSSummary(BUCKET_ONE_OBJECT_ID); diff --git a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestNSSummaryTaskWithOBS.java b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestNSSummaryTaskWithOBS.java index 0db53aee1299..386a5539f125 100644 --- a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestNSSummaryTaskWithOBS.java +++ b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestNSSummaryTaskWithOBS.java @@ -22,6 +22,8 @@ import static org.apache.hadoop.ozone.recon.OMMetadataManagerTestUtils.getMockOzoneManagerServiceProviderWithFSO; import static org.apache.hadoop.ozone.recon.OMMetadataManagerTestUtils.getTestReconOmMetadataManager; import static org.apache.hadoop.ozone.recon.OMMetadataManagerTestUtils.writeKeyToOm; +import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_NSSUMMARY_FLUSH_TO_DB_MAX_THRESHOLD; +import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_NSSUMMARY_FLUSH_TO_DB_MAX_THRESHOLD_DEFAULT; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; @@ -34,8 +36,8 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.utils.db.RDBBatchOperation; -import org.apache.hadoop.ozone.om.OMConfigKeys; import org.apache.hadoop.ozone.om.OMMetadataManager; +import org.apache.hadoop.ozone.om.OmConfig; import org.apache.hadoop.ozone.om.OmMetadataManagerImpl; import org.apache.hadoop.ozone.om.helpers.BucketLayout; import org.apache.hadoop.ozone.om.helpers.OmBucketInfo; @@ -126,9 +128,13 @@ void setUp(@TempDir File tmpDir) throws Exception { populateOMDB(); + long nsSummaryFlushToDBMaxThreshold = omConfiguration.getLong( + OZONE_RECON_NSSUMMARY_FLUSH_TO_DB_MAX_THRESHOLD, + OZONE_RECON_NSSUMMARY_FLUSH_TO_DB_MAX_THRESHOLD_DEFAULT); nSSummaryTaskWithOBS = new NSSummaryTaskWithOBS( reconNamespaceSummaryManager, - reconOMMetadataManager, omConfiguration); + reconOMMetadataManager, omConfiguration, + nsSummaryFlushToDBMaxThreshold); } /** @@ -234,7 +240,7 @@ public void setUp() throws IOException { // reinit Recon RocksDB's namespace CF. reconNamespaceSummaryManager.clearNSSummaryTable(); nSSummaryTaskWithOBS.reprocessWithOBS(reconOMMetadataManager); - nSSummaryTaskWithOBS.processWithOBS(processEventBatch()); + nSSummaryTaskWithOBS.processWithOBS(processEventBatch(), 0); nsSummaryForBucket1 = reconNamespaceSummaryManager.getNSSummary(BUCKET_ONE_OBJECT_ID); @@ -458,8 +464,8 @@ private void initializeNewOmMetadataManager( omConfiguration = new OzoneConfiguration(); omConfiguration.set(OZONE_OM_DB_DIRS, omDbDir.getAbsolutePath()); - omConfiguration.set(OMConfigKeys - .OZONE_OM_ENABLE_FILESYSTEM_PATHS, "true"); + omConfiguration.set(OmConfig.Keys.ENABLE_FILESYSTEM_PATHS, "true"); + omConfiguration.set(OZONE_RECON_NSSUMMARY_FLUSH_TO_DB_MAX_THRESHOLD, "10"); omMetadataManager = new OmMetadataManagerImpl( omConfiguration, null); diff --git a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestOmTableInsightTask.java b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestOmTableInsightTask.java index 040975a5ef3d..dc8d34f335c5 100644 --- a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestOmTableInsightTask.java +++ b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestOmTableInsightTask.java @@ -45,9 +45,9 @@ import java.nio.file.Path; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.List; import org.apache.commons.lang3.tuple.ImmutablePair; -import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.hdds.client.StandaloneReplicationConfig; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; @@ -155,7 +155,7 @@ private void initializeInjector() throws IOException { globalStatsDao, getConfiguration(), reconOMMetadataManager); nSSummaryTaskWithFso = new NSSummaryTaskWithFSO( reconNamespaceSummaryManager, reconOMMetadataManager, - ozoneConfiguration); + ozoneConfiguration, 10); dslContext = getDslContext(); omTableInsightTask.setTables(omTableInsightTask.getTaskTables()); @@ -289,9 +289,9 @@ public void testReprocessForDeletedDirectory() throws Exception { // Generate NamespaceSummary for the OM DB nSSummaryTaskWithFso.reprocessWithFSO(reconOMMetadataManager); - Pair result = + ReconOmTask.TaskResult result = omTableInsightTask.reprocess(reconOMMetadataManager); - assertTrue(result.getRight()); + assertTrue(result.isTaskSuccess()); assertEquals(3, getCountForTable(DELETED_DIR_TABLE)); } @@ -329,7 +329,7 @@ public void testProcessForDeletedDirectoryTable() throws IOException { DELETED_DIR_TABLE, PUT, null)); } OMUpdateEventBatch putEventBatch = new OMUpdateEventBatch(putEvents, 0L); - omTableInsightTask.process(putEventBatch); + omTableInsightTask.process(putEventBatch, Collections.emptyMap()); assertEquals(5, getCountForTable(DELETED_DIR_TABLE)); @@ -343,7 +343,7 @@ public void testProcessForDeletedDirectoryTable() throws IOException { getOmKeyInfo("vol1", "bucket1", DIR_ONE, 3L, false), DELETED_DIR_TABLE, DELETE, null)); OMUpdateEventBatch deleteEventBatch = new OMUpdateEventBatch(deleteEvents, 0L); - omTableInsightTask.process(deleteEventBatch); + omTableInsightTask.process(deleteEventBatch, Collections.emptyMap()); assertEquals(3, getCountForTable(DELETED_DIR_TABLE)); } @@ -376,10 +376,10 @@ public void testReprocessForCount() throws Exception { when(mockIter.next()).thenReturn(mockKeyValue); } - Pair result = + ReconOmTask.TaskResult result = omTableInsightTask.reprocess(omMetadataManager); - assertTrue(result.getRight()); + assertTrue(result.isTaskSuccess()); assertEquals(5L, getCountForTable(KEY_TABLE)); assertEquals(5L, getCountForTable(VOLUME_TABLE)); assertEquals(5L, getCountForTable(BUCKET_TABLE)); @@ -397,9 +397,9 @@ public void testReprocessForOpenKeyTable() throws Exception { writeOpenKeyToOm(reconOMMetadataManager, "key1", "Bucket3", "Volume3", null, 3L); - Pair result = + ReconOmTask.TaskResult result = omTableInsightTask.reprocess(reconOMMetadataManager); - assertTrue(result.getRight()); + assertTrue(result.isTaskSuccess()); assertEquals(3L, getCountForTable(OPEN_KEY_TABLE)); // Test for both replicated and unreplicated size for OPEN_KEY_TABLE assertEquals(6L, getUnReplicatedSizeForTable(OPEN_KEY_TABLE)); @@ -416,9 +416,9 @@ public void testReprocessForOpenFileTable() throws Exception { writeOpenFileToOm(reconOMMetadataManager, "file3", "Bucket3", "Volume3", "file3", 3, 0, 3, 3, null, 3L); - Pair result = + ReconOmTask.TaskResult result = omTableInsightTask.reprocess(reconOMMetadataManager); - assertTrue(result.getRight()); + assertTrue(result.isTaskSuccess()); assertEquals(3L, getCountForTable(OPEN_FILE_TABLE)); // Test for both replicated and unreplicated size for OPEN_FILE_TABLE assertEquals(6L, getUnReplicatedSizeForTable(OPEN_FILE_TABLE)); @@ -440,9 +440,9 @@ public void testReprocessForDeletedTable() throws Exception { deletedKeysList3, "Bucket3", "Volume3"); - Pair result = + ReconOmTask.TaskResult result = omTableInsightTask.reprocess(reconOMMetadataManager); - assertTrue(result.getRight()); + assertTrue(result.isTaskSuccess()); assertEquals(6L, getCountForTable(DELETED_TABLE)); // Test for both replicated and unreplicated size for DELETED_TABLE assertEquals(600L, getUnReplicatedSizeForTable(DELETED_TABLE)); @@ -479,7 +479,7 @@ public void testProcessForCount() { // Processing the initial batch of events OMUpdateEventBatch initialBatch = new OMUpdateEventBatch(initialEvents, 0L); - omTableInsightTask.process(initialBatch); + omTableInsightTask.process(initialBatch, Collections.emptyMap()); // Verifying the count in each table for (String tableName : omTableInsightTask.getTaskTables()) { @@ -508,7 +508,7 @@ public void testProcessForCount() { // Processing the additional events OMUpdateEventBatch additionalBatch = new OMUpdateEventBatch(additionalEvents, 0L); - omTableInsightTask.process(additionalBatch); + omTableInsightTask.process(additionalBatch, Collections.emptyMap()); // Verifying the final count in each table for (String tableName : omTableInsightTask.getTaskTables()) { if (tableName.equals(DELETED_TABLE)) { @@ -537,7 +537,7 @@ public void testProcessForOpenKeyTableAndOpenFileTable() { } OMUpdateEventBatch putEventBatch = new OMUpdateEventBatch(putEvents, 0L); - omTableInsightTask.process(putEventBatch); + omTableInsightTask.process(putEventBatch, Collections.emptyMap()); // After 5 PUTs, size should be 5 * 1000 = 5000 for (String tableName : new ArrayList<>( @@ -555,7 +555,7 @@ public void testProcessForOpenKeyTableAndOpenFileTable() { getOMUpdateEvent("item0", omKeyInfo, OPEN_FILE_TABLE, DELETE, null)); OMUpdateEventBatch deleteEventBatch = new OMUpdateEventBatch(deleteEvents, 0L); - omTableInsightTask.process(deleteEventBatch); + omTableInsightTask.process(deleteEventBatch, Collections.emptyMap()); // After deleting "item0", size should be 4 * 1000 = 4000 for (String tableName : new ArrayList<>( @@ -578,7 +578,7 @@ public void testProcessForOpenKeyTableAndOpenFileTable() { } OMUpdateEventBatch updateEventBatch = new OMUpdateEventBatch(updateEvents, 0L); - omTableInsightTask.process(updateEventBatch); + omTableInsightTask.process(updateEventBatch, Collections.emptyMap()); // After updating "item1", size should be 4000 - 1000 + 2000 = 5000 // presentValue - oldValue + newValue = updatedValue @@ -615,7 +615,7 @@ public void testProcessForDeletedTable() { null)); } OMUpdateEventBatch putEventBatch = new OMUpdateEventBatch(putEvents, 0L); - omTableInsightTask.process(putEventBatch); + omTableInsightTask.process(putEventBatch, Collections.emptyMap()); // Each of the 5 RepeatedOmKeyInfo object has 5 OmKeyInfo obj, // so total deleted keys should be 5 * 5 = 25 assertEquals(25L, getCountForTable(DELETED_TABLE)); @@ -631,7 +631,7 @@ public void testProcessForDeletedTable() { getOMUpdateEvent("item0", repeatedOmKeyInfo, DELETED_TABLE, DELETE, null)); OMUpdateEventBatch deleteEventBatch = new OMUpdateEventBatch(deleteEvents, 0L); - omTableInsightTask.process(deleteEventBatch); + omTableInsightTask.process(deleteEventBatch, Collections.emptyMap()); // After deleting "item0" total deleted keys should be 20 assertEquals(20L, getCountForTable(DELETED_TABLE)); // After deleting "item0", size should be 4 * 1000 = 4000 diff --git a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestReconTaskControllerImpl.java b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestReconTaskControllerImpl.java index 384da46aa16a..ad7eafd1600b 100644 --- a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestReconTaskControllerImpl.java +++ b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestReconTaskControllerImpl.java @@ -21,6 +21,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertSame; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyMap; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; @@ -28,7 +29,6 @@ import static org.mockito.Mockito.when; import java.util.HashSet; -import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.ozone.om.OMMetadataManager; import org.apache.hadoop.ozone.recon.persistence.AbstractReconSqlDBTest; @@ -81,8 +81,8 @@ public void testRegisterTask() { @Test public void testConsumeOMEvents() throws Exception { ReconOmTask reconOmTaskMock = getMockTask("MockTask"); - when(reconOmTaskMock.process(any(OMUpdateEventBatch.class))) - .thenReturn(new ImmutablePair<>("MockTask", true)); + when(reconOmTaskMock.process(any(OMUpdateEventBatch.class), anyMap())) + .thenReturn(new ReconOmTask.TaskResult.Builder().setTaskName("MockTask").setTaskSuccess(true).build()); reconTaskController.registerTask(reconOmTaskMock); OMUpdateEventBatch omUpdateEventBatchMock = mock(OMUpdateEventBatch.class); when(omUpdateEventBatchMock.getLastSequenceNumber()).thenReturn(100L); @@ -94,7 +94,7 @@ public void testConsumeOMEvents() throws Exception { mock(OMMetadataManager.class)); verify(reconOmTaskMock, times(1)) - .process(any()); + .process(any(), anyMap()); long endTime = System.currentTimeMillis(); ReconTaskStatus reconTaskStatus = reconTaskStatusDao.findById("MockTask"); @@ -111,7 +111,7 @@ public void testTaskRecordsFailureOnException() throws Exception { OMUpdateEventBatch omUpdateEventBatchMock = mock(OMUpdateEventBatch.class); // Throw exception when trying to run task - when(reconOmTaskMock.process(any(OMUpdateEventBatch.class))) + when(reconOmTaskMock.process(any(OMUpdateEventBatch.class), anyMap())) .thenThrow(new RuntimeException("Mock Failure")); reconTaskController.registerTask(reconOmTaskMock); when(omUpdateEventBatchMock.getLastSequenceNumber()).thenReturn(100L); @@ -123,7 +123,7 @@ public void testTaskRecordsFailureOnException() throws Exception { mock(OMMetadataManager.class)); verify(reconOmTaskMock, times(1)) - .process(any()); + .process(any(), anyMap()); long endTime = System.currentTimeMillis(); ReconTaskStatus reconTaskStatus = reconTaskStatusDao.findById("MockTask"); @@ -209,7 +209,7 @@ public void testReInitializeTasks() throws Exception { ReconOmTask reconOmTaskMock = getMockTask("MockTask2"); when(reconOmTaskMock.reprocess(omMetadataManagerMock)) - .thenReturn(new ImmutablePair<>("MockTask2", true)); + .thenReturn(new ReconOmTask.TaskResult.Builder().setTaskName("MockTask2").setTaskSuccess(true).build()); when(omMetadataManagerMock.getLastSequenceNumberFromDB() ).thenReturn(100L);