From 2f26854a6adf31c966a8dc35aa05b2b2c71a90f9 Mon Sep 17 00:00:00 2001 From: arafat Date: Tue, 4 Mar 2025 13:08:25 +0530 Subject: [PATCH 01/11] Rebased the PR --- .../hadoop/ozone/recon/ReconConstants.java | 3 + .../ozone/recon/ReconControllerModule.java | 6 +- .../recon/tasks/ContainerKeyMapperHelper.java | 473 +++++++++++++++++ .../recon/tasks/ContainerKeyMapperTask.java | 500 ------------------ .../tasks/ContainerKeyMapperTaskFSO.java | 67 +++ .../tasks/ContainerKeyMapperTaskOBS.java | 67 +++ .../recon/api/TestContainerEndpoint.java | 14 +- .../recon/api/TestOmDBInsightEndPoint.java | 6 +- .../tasks/TestContainerKeyMapperTask.java | 28 +- 9 files changed, 640 insertions(+), 524 deletions(-) create mode 100644 hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperHelper.java create mode 100644 hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperTaskFSO.java create mode 100644 hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperTaskOBS.java diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconConstants.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconConstants.java index ecd88f80995f..8e034c781769 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconConstants.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconConstants.java @@ -95,11 +95,14 @@ private ReconConstants() { // For file-size count reprocessing: ensure only one task truncates the table. public static final AtomicBoolean FILE_SIZE_COUNT_TABLE_TRUNCATED = new AtomicBoolean(false); + public static final AtomicBoolean CONTAINER_KEY_TABLES_TRUNCATED = new AtomicBoolean(false); + /** * Resets the table-truncated flag for the given tables. This should be called once per reprocess cycle, * for example by the OM task controller, before the tasks run. */ public static void resetTableTruncatedFlags() { FILE_SIZE_COUNT_TABLE_TRUNCATED.set(false); + CONTAINER_KEY_TABLES_TRUNCATED.set(false); } } diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconControllerModule.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconControllerModule.java index 5bbda0356217..ad2d25a29d4e 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconControllerModule.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconControllerModule.java @@ -58,7 +58,8 @@ import org.apache.hadoop.ozone.recon.spi.impl.ReconDBProvider; import org.apache.hadoop.ozone.recon.spi.impl.ReconNamespaceSummaryManagerImpl; import org.apache.hadoop.ozone.recon.spi.impl.StorageContainerServiceProviderImpl; -import org.apache.hadoop.ozone.recon.tasks.ContainerKeyMapperTask; +import org.apache.hadoop.ozone.recon.tasks.ContainerKeyMapperTaskFSO; +import org.apache.hadoop.ozone.recon.tasks.ContainerKeyMapperTaskOBS; import org.apache.hadoop.ozone.recon.tasks.FileSizeCountTaskFSO; import org.apache.hadoop.ozone.recon.tasks.FileSizeCountTaskOBS; import org.apache.hadoop.ozone.recon.tasks.NSSummaryTask; @@ -131,7 +132,8 @@ static class ReconOmTaskBindingModule extends AbstractModule { protected void configure() { Multibinder taskBinder = Multibinder.newSetBinder(binder(), ReconOmTask.class); - taskBinder.addBinding().to(ContainerKeyMapperTask.class); + taskBinder.addBinding().to(ContainerKeyMapperTaskFSO.class); + taskBinder.addBinding().to(ContainerKeyMapperTaskOBS.class); taskBinder.addBinding().to(FileSizeCountTaskFSO.class); taskBinder.addBinding().to(FileSizeCountTaskOBS.class); taskBinder.addBinding().to(OmTableInsightTask.class); diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperHelper.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperHelper.java new file mode 100644 index 000000000000..3559c8015b0e --- /dev/null +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperHelper.java @@ -0,0 +1,473 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.recon.tasks; + +import java.io.IOException; +import java.time.Duration; +import java.time.Instant; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.commons.lang3.tuple.ImmutablePair; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.hadoop.hdds.utils.db.RDBBatchOperation; +import org.apache.hadoop.hdds.utils.db.Table; +import org.apache.hadoop.hdds.utils.db.TableIterator; +import org.apache.hadoop.ozone.om.OMMetadataManager; +import org.apache.hadoop.ozone.om.helpers.BucketLayout; +import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; +import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; +import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup; +import org.apache.hadoop.ozone.recon.ReconConstants; +import org.apache.hadoop.ozone.recon.api.types.ContainerKeyPrefix; +import org.apache.hadoop.ozone.recon.api.types.KeyPrefixContainer; +import org.apache.hadoop.ozone.recon.spi.ReconContainerMetadataManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Helper class that encapsulates the common logic for ContainerKeyMapperTaskFSO and ContainerKeyMapperTaskOBS. + */ +public abstract class ContainerKeyMapperHelper { + + private static final Logger LOG = LoggerFactory.getLogger(ContainerKeyMapperHelper.class); + + // Static lock to guard table truncation. + private static final Object TRUNCATE_LOCK = new Object(); + + /** + * Ensures that the container key tables are truncated only once before reprocessing. + * Uses an AtomicBoolean to track if truncation has already been performed. + * + * @param reconContainerMetadataManager The metadata manager instance responsible for DB operations. + */ + public static void truncateTablesIfNeeded(ReconContainerMetadataManager reconContainerMetadataManager) { + synchronized (TRUNCATE_LOCK) { + if (ReconConstants.CONTAINER_KEY_TABLES_TRUNCATED.compareAndSet(false, true)) { + try { + // Perform table truncation + reconContainerMetadataManager.reinitWithNewContainerDataFromOm(new HashMap<>()); + LOG.info("Successfully truncated container key tables."); + } catch (IOException e) { + // Reset the flag so truncation can be retried + ReconConstants.CONTAINER_KEY_TABLES_TRUNCATED.set(false); + LOG.error("Error while truncating container key tables. Resetting flag.", e); + throw new RuntimeException("Table truncation failed", e); + } + } else { + LOG.info("Container key tables already truncated by another task, waiting for truncation to complete."); + } + } + } + + public static boolean reprocess(OMMetadataManager omMetadataManager, + ReconContainerMetadataManager reconContainerMetadataManager, + BucketLayout bucketLayout, + String taskName, + long containerKeyFlushToDBMaxThreshold) { + long omKeyCount = 0; + Map containerKeyMap = new HashMap<>(); + Map containerKeyCountMap = new HashMap<>(); + + try { + LOG.info("Starting a 'reprocess' run for {}.", taskName); + Instant start = Instant.now(); + + // Ensure the tables are truncated only once + truncateTablesIfNeeded(reconContainerMetadataManager); + + // Get the appropriate table based on BucketLayout + Table omKeyInfoTable = omMetadataManager.getKeyTable(bucketLayout); + + // Iterate through the table and process keys + try (TableIterator> keyIter = omKeyInfoTable.iterator()) { + while (keyIter.hasNext()) { + Table.KeyValue kv = keyIter.next(); + handleKeyReprocess(kv.getKey(), kv.getValue(), containerKeyMap, containerKeyCountMap, + reconContainerMetadataManager); + omKeyCount++; + + // Check and flush data if it reaches the batch threshold + if (!checkAndCallFlushToDB(containerKeyMap, containerKeyFlushToDBMaxThreshold, + reconContainerMetadataManager)) { + LOG.error("Failed to flush container key data for {}", taskName); + return false; + } + } + } + + // Flush and commit changes + if (!flushAndCommitContainerKeyInfoToDB(containerKeyMap, containerKeyCountMap, reconContainerMetadataManager)) { + LOG.error("Failed to flush Container Key data to DB for {}", taskName); + return false; + } + + LOG.info("Completed 'reprocess' for {}. Processed {} keys.", taskName, omKeyCount); + Instant end = Instant.now(); + long duration = Duration.between(start, end).toMillis(); + LOG.info("Total time: {} seconds.", (double) duration / 1000.0); + + } catch (IOException ioEx) { + LOG.error("Error populating Container Key data for {} in Recon DB.", taskName, ioEx); + return false; + } + return true; + } + + private static boolean checkAndCallFlushToDB(Map containerKeyMap, + long containerKeyFlushToDBMaxThreshold, + ReconContainerMetadataManager reconContainerMetadataManager) { + if (containerKeyMap.size() >= containerKeyFlushToDBMaxThreshold) { + return flushAndCommitContainerKeyInfoToDB(containerKeyMap, Collections.emptyMap(), reconContainerMetadataManager); + } + return true; + } + + public static boolean process(OMUpdateEventBatch events, + String tableName, + ReconContainerMetadataManager reconContainerMetadataManager, + String taskName) { + Iterator eventIterator = events.getIterator(); + int eventCount = 0; + + // In-memory maps for fast look up and batch write + // (HDDS-8580) containerKeyMap map is allowed to be used + // in "process" without batching since the maximum number of keys + // is bounded by delta limit configurations + + // (container, key) -> count + Map containerKeyMap = new HashMap<>(); + // containerId -> key count + Map containerKeyCountMap = new HashMap<>(); + // List of the deleted (container, key) pair's + List deletedKeyCountList = new ArrayList<>(); + long startTime = System.currentTimeMillis(); + + while (eventIterator.hasNext()) { + OMDBUpdateEvent omdbUpdateEvent = eventIterator.next(); + // Filter event inside process method to avoid duping + if (!tableName.equals(omdbUpdateEvent.getTable())) { + continue; + } + String updatedKey = omdbUpdateEvent.getKey(); + OmKeyInfo updatedKeyValue = omdbUpdateEvent.getValue(); + try { + switch (omdbUpdateEvent.getAction()) { + case PUT: + handlePutOMKeyEvent(updatedKey, updatedKeyValue, containerKeyMap, + containerKeyCountMap, deletedKeyCountList, reconContainerMetadataManager); + break; + + case DELETE: + handleDeleteOMKeyEvent(updatedKey, containerKeyMap, + containerKeyCountMap, deletedKeyCountList, reconContainerMetadataManager); + break; + + case UPDATE: + if (omdbUpdateEvent.getOldValue() != null) { + handleDeleteOMKeyEvent( + omdbUpdateEvent.getOldValue().getKeyName(), containerKeyMap, + containerKeyCountMap, deletedKeyCountList, reconContainerMetadataManager); + } else { + LOG.warn("Update event does not have the old Key Info for {}.", updatedKey); + } + handlePutOMKeyEvent(updatedKey, updatedKeyValue, containerKeyMap, + containerKeyCountMap, deletedKeyCountList, reconContainerMetadataManager); + break; + + default: + LOG.info("Skipping DB update event: {}", omdbUpdateEvent.getAction()); + } + eventCount++; + } catch (IOException e) { + LOG.error("Unexpected exception while updating key data: {} ", updatedKey, e); + return false; + } + } + try { + writeToTheDB(containerKeyMap, containerKeyCountMap, deletedKeyCountList, reconContainerMetadataManager); + } catch (IOException e) { + LOG.error("Unable to write Container Key Prefix data in Recon DB.", e); + return false; + } + LOG.info("{} successfully processed {} OM DB update event(s) in {} milliseconds.", + taskName, eventCount, (System.currentTimeMillis() - startTime)); + return true; + } + + /** + * Note to add an OM key and update containerID -> no. of keys count. + * + * @param key key String + * @param omKeyInfo omKeyInfo value + * @param containerKeyMap we keep the added containerKeys in this map + * (in this batch) + * @param containerKeyCountMap we keep the containerKey counts in this map + * @param deletedContainerKeyList list of the deleted containerKeys + * @throws IOException if unable to write to recon DB. + */ + private static void handlePutOMKeyEvent(String key, OmKeyInfo omKeyInfo, + Map containerKeyMap, + Map containerKeyCountMap, + List deletedContainerKeyList, + ReconContainerMetadataManager reconContainerMetadataManager) + throws IOException { + long containerCountToIncrement = 0; + for (OmKeyLocationInfoGroup omKeyLocationInfoGroup : omKeyInfo.getKeyLocationVersions()) { + long keyVersion = omKeyLocationInfoGroup.getVersion(); + for (OmKeyLocationInfo omKeyLocationInfo : omKeyLocationInfoGroup.getLocationList()) { + long containerId = omKeyLocationInfo.getContainerID(); + ContainerKeyPrefix containerKeyPrefix = ContainerKeyPrefix.get(containerId, key, keyVersion); + if (reconContainerMetadataManager.getCountForContainerKeyPrefix(containerKeyPrefix) == 0 && + !containerKeyMap.containsKey(containerKeyPrefix)) { + // Save on writes. No need to save same container-key prefix + // mapping again. + containerKeyMap.put(containerKeyPrefix, 1); + // Remove the container-key prefix from the deleted list if we + // previously deleted it in this batch (and now we add it again) + deletedContainerKeyList.remove(containerKeyPrefix); + + // check if container already exists and + // increment the count of containers if it does not exist + if (!reconContainerMetadataManager.doesContainerExists(containerId) && + !containerKeyCountMap.containsKey(containerId)) { + containerCountToIncrement++; + } + + // update the count of keys for the given containerID + long keyCount; + if (containerKeyCountMap.containsKey(containerId)) { + keyCount = containerKeyCountMap.get(containerId); + } else { + keyCount = reconContainerMetadataManager.getKeyCountForContainer(containerId); + } + + // increment the count and update containerKeyCount. + // keyCount will be 0 if containerID is not found. So, there is no + // need to initialize keyCount for the first time. + containerKeyCountMap.put(containerId, ++keyCount); + } + } + } + + if (containerCountToIncrement > 0) { + reconContainerMetadataManager.incrementContainerCountBy(containerCountToIncrement); + } + } + + /** + * Note to delete an OM Key and update the containerID -> no. of keys counts + * (we are preparing for batch deletion in these data structures). + * + * @param key key String. + * @param containerKeyMap we keep the added containerKeys in this map + * (in this batch) + * @param containerKeyCountMap we keep the containerKey counts in this map + * @param deletedContainerKeyList list of the deleted containerKeys + * @throws IOException If Unable to write to container DB. + */ + private static void handleDeleteOMKeyEvent(String key, + Map containerKeyMap, + Map containerKeyCountMap, + List deletedContainerKeyList, + ReconContainerMetadataManager reconContainerMetadataManager) + throws IOException { + + Set keysToBeDeleted = new HashSet<>(); + try (TableIterator> + keyContainerIterator = reconContainerMetadataManager.getKeyContainerTableIterator()) { + + // Check if we have keys in this container in the DB + keyContainerIterator.seek(KeyPrefixContainer.get(key)); + while (keyContainerIterator.hasNext()) { + Table.KeyValue keyValue = keyContainerIterator.next(); + String keyPrefix = keyValue.getKey().getKeyPrefix(); + if (keyPrefix.equals(key)) { + if (keyValue.getKey().getContainerId() != -1) { + keysToBeDeleted.add(keyValue.getKey().toContainerKeyPrefix()); + } + } else { + break; + } + } + } + + // Check if we have keys in this container in our containerKeyMap + containerKeyMap.keySet().forEach((ContainerKeyPrefix containerKeyPrefix) -> { + String keyPrefix = containerKeyPrefix.getKeyPrefix(); + if (keyPrefix.equals(key)) { + keysToBeDeleted.add(containerKeyPrefix); + } + }); + + for (ContainerKeyPrefix containerKeyPrefix : keysToBeDeleted) { + deletedContainerKeyList.add(containerKeyPrefix); + // Remove the container-key prefix from the map if we previously added + // it in this batch (and now we delete it) + containerKeyMap.remove(containerKeyPrefix); + + // Decrement count and update containerKeyCount. + Long containerID = containerKeyPrefix.getContainerId(); + long keyCount; + if (containerKeyCountMap.containsKey(containerID)) { + keyCount = containerKeyCountMap.get(containerID); + } else { + keyCount = reconContainerMetadataManager.getKeyCountForContainer(containerID); + } + if (keyCount > 0) { + containerKeyCountMap.put(containerID, --keyCount); + } + } + } + + /** + * Writes container key data to the database. + * + * @param containerKeyMap Map containing container key mappings. + * @param containerKeyCountMap Map containing container key counts. + * @param deletedContainerKeyList List of deleted container keys. + * @param reconContainerMetadataManager Recon metadata manager instance. + * @throws IOException If unable to write to the Recon DB. + */ + private static void writeToTheDB(Map containerKeyMap, + Map containerKeyCountMap, + List deletedContainerKeyList, + ReconContainerMetadataManager reconContainerMetadataManager) + throws IOException { + + try (RDBBatchOperation rdbBatchOperation = new RDBBatchOperation()) { + + // Write container key mappings + containerKeyMap.keySet().forEach((ContainerKeyPrefix key) -> { + try { + reconContainerMetadataManager.batchStoreContainerKeyMapping( + rdbBatchOperation, key, containerKeyMap.get(key)); + } catch (IOException e) { + LOG.error("Unable to write Container Key Prefix data in Recon DB.", e); + } + }); + + // Write container key count mappings + containerKeyCountMap.keySet().forEach((Long key) -> { + try { + reconContainerMetadataManager.batchStoreContainerKeyCounts( + rdbBatchOperation, key, containerKeyCountMap.get(key)); + } catch (IOException e) { + LOG.error("Unable to write Container Key Count data in Recon DB.", e); + } + }); + + // Delete container key mappings + deletedContainerKeyList.forEach((ContainerKeyPrefix key) -> { + try { + reconContainerMetadataManager.batchDeleteContainerMapping( + rdbBatchOperation, key); + } catch (IOException e) { + LOG.error("Unable to delete Container Key Prefix data in Recon DB.", e); + } + }); + + // Commit batch operation + reconContainerMetadataManager.commitBatchOperation(rdbBatchOperation); + } + } + + /** + * Handles reprocessing of OM keys for container key mapping. + * + * @param key key String + * @param omKeyInfo OmKeyInfo value + * @param containerKeyMap map of (container, key) -> count + * @param containerKeyCountMap map of containerId -> key count + * @param reconContainerMetadataManager Recon metadata manager instance + * @throws IOException if unable to write to Recon DB. + */ + public static void handleKeyReprocess(String key, + OmKeyInfo omKeyInfo, + Map containerKeyMap, + Map containerKeyCountMap, + ReconContainerMetadataManager reconContainerMetadataManager) + throws IOException { + + long containerCountToIncrement = 0; + + for (OmKeyLocationInfoGroup omKeyLocationInfoGroup : omKeyInfo.getKeyLocationVersions()) { + long keyVersion = omKeyLocationInfoGroup.getVersion(); + for (OmKeyLocationInfo omKeyLocationInfo : omKeyLocationInfoGroup.getLocationList()) { + long containerId = omKeyLocationInfo.getContainerID(); + ContainerKeyPrefix containerKeyPrefix = ContainerKeyPrefix.get(containerId, key, keyVersion); + + if (reconContainerMetadataManager.getCountForContainerKeyPrefix(containerKeyPrefix) == 0 + && !containerKeyMap.containsKey(containerKeyPrefix)) { + // Save on writes. No need to save same container-key prefix mapping again. + containerKeyMap.put(containerKeyPrefix, 1); + + // Check if container already exists; if not, increment the count + if (!reconContainerMetadataManager.doesContainerExists(containerId) + && !containerKeyCountMap.containsKey(containerId)) { + containerCountToIncrement++; + } + + // Update the count of keys for the given containerID + long keyCount = containerKeyCountMap.getOrDefault(containerId, + reconContainerMetadataManager.getKeyCountForContainer(containerId)); + + containerKeyCountMap.put(containerId, keyCount + 1); + } + } + } + + if (containerCountToIncrement > 0) { + reconContainerMetadataManager.incrementContainerCountBy(containerCountToIncrement); + } + } + + /** + * Flushes and commits container key mappings to the database. + * + * @param containerKeyMap map of (container, key) -> count + * @param containerKeyCountMap map of containerId -> key count + * @param reconContainerMetadataManager Recon metadata manager instance + * @return true if the data was successfully flushed, false otherwise + */ + public static boolean flushAndCommitContainerKeyInfoToDB( + Map containerKeyMap, + Map containerKeyCountMap, + ReconContainerMetadataManager reconContainerMetadataManager) { + + try { + // No deleted container list needed since "reprocess" only has put operations + writeToTheDB(containerKeyMap, containerKeyCountMap, Collections.emptyList(), reconContainerMetadataManager); + + // Clear in-memory maps after successful commit + containerKeyMap.clear(); + containerKeyCountMap.clear(); + + } catch (IOException e) { + LOG.error("Unable to write Container Key and Container Key Count data in Recon DB.", e); + return false; + } + return true; + } + +} diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperTask.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperTask.java index e42e021b9e45..e69de29bb2d1 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperTask.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperTask.java @@ -1,500 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.ozone.recon.tasks; - -import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.FILE_TABLE; -import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.KEY_TABLE; - -import com.google.inject.Inject; -import java.io.IOException; -import java.time.Duration; -import java.time.Instant; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Set; -import org.apache.hadoop.hdds.conf.OzoneConfiguration; -import org.apache.hadoop.hdds.utils.db.RDBBatchOperation; -import org.apache.hadoop.hdds.utils.db.Table; -import org.apache.hadoop.hdds.utils.db.TableIterator; -import org.apache.hadoop.ozone.om.OMMetadataManager; -import org.apache.hadoop.ozone.om.helpers.BucketLayout; -import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; -import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; -import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup; -import org.apache.hadoop.ozone.recon.ReconServerConfigKeys; -import org.apache.hadoop.ozone.recon.api.types.ContainerKeyPrefix; -import org.apache.hadoop.ozone.recon.api.types.KeyPrefixContainer; -import org.apache.hadoop.ozone.recon.spi.ReconContainerMetadataManager; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Class to iterate over the OM DB and populate the Recon container DB with - * the container -> Key reverse mapping. - */ -public class ContainerKeyMapperTask implements ReconOmTask { - - private static final Logger LOG = - LoggerFactory.getLogger(ContainerKeyMapperTask.class); - - private ReconContainerMetadataManager reconContainerMetadataManager; - private final long containerKeyFlushToDBMaxThreshold; - - @Inject - public ContainerKeyMapperTask(ReconContainerMetadataManager - reconContainerMetadataManager, - OzoneConfiguration configuration) { - this.reconContainerMetadataManager = reconContainerMetadataManager; - this.containerKeyFlushToDBMaxThreshold = configuration.getLong( - ReconServerConfigKeys. - OZONE_RECON_CONTAINER_KEY_FLUSH_TO_DB_MAX_THRESHOLD, - ReconServerConfigKeys. - OZONE_RECON_CONTAINER_KEY_FLUSH_TO_DB_MAX_THRESHOLD_DEFAULT - ); - } - - /** - * Read Key -> ContainerId data from OM snapshot DB and write reverse map - * (container, key) -> count to Recon Container DB. - */ - @Override - public TaskResult reprocess(OMMetadataManager omMetadataManager) { - long omKeyCount = 0; - - // In-memory maps for fast look up and batch write - // (container, key) -> count - Map containerKeyMap = new HashMap<>(); - // containerId -> key count - Map containerKeyCountMap = new HashMap<>(); - try { - LOG.debug("Starting a 'reprocess' run of ContainerKeyMapperTask."); - Instant start = Instant.now(); - - // initialize new container DB - reconContainerMetadataManager - .reinitWithNewContainerDataFromOm(new HashMap<>()); - - // loop over both key table and file table - for (BucketLayout layout : Arrays.asList(BucketLayout.LEGACY, - BucketLayout.FILE_SYSTEM_OPTIMIZED)) { - // (HDDS-8580) Since "reprocess" iterate over the whole key table, - // containerKeyMap needs to be incrementally flushed to DB based on - // configured batch threshold. - // containerKeyCountMap can be flushed at the end since the number - // of containers in a cluster will not have significant memory overhead. - Table omKeyInfoTable = - omMetadataManager.getKeyTable(layout); - try ( - TableIterator> - keyIter = omKeyInfoTable.iterator()) { - while (keyIter.hasNext()) { - Table.KeyValue kv = keyIter.next(); - OmKeyInfo omKeyInfo = kv.getValue(); - handleKeyReprocess(kv.getKey(), omKeyInfo, containerKeyMap, - containerKeyCountMap); - if (!checkAndCallFlushToDB(containerKeyMap)) { - LOG.error("Unable to flush containerKey information to the DB"); - return buildTaskResult(false); - } - omKeyCount++; - } - } - } - - // flush and commit left out keys at end, - // also batch write containerKeyCountMap to the containerKeyCountTable - if (!flushAndCommitContainerKeyInfoToDB(containerKeyMap, - containerKeyCountMap)) { - LOG.error("Unable to flush Container Key Count and " + - "remaining Container Key information to the DB"); - return buildTaskResult(false); - } - - LOG.debug("Completed 'reprocess' of ContainerKeyMapperTask."); - Instant end = Instant.now(); - long duration = Duration.between(start, end).toMillis(); - LOG.debug("It took me {} seconds to process {} keys.", - (double) duration / 1000.0, omKeyCount); - } catch (IOException ioEx) { - LOG.error("Unable to populate Container Key data in Recon DB. ", - ioEx); - return buildTaskResult(false); - } - return buildTaskResult(true); - } - - private boolean flushAndCommitContainerKeyInfoToDB( - Map containerKeyMap, - Map containerKeyCountMap) { - try { - // deleted container list is not needed since "reprocess" only has - // put operations - writeToTheDB(containerKeyMap, containerKeyCountMap, - Collections.emptyList()); - containerKeyMap.clear(); - containerKeyCountMap.clear(); - } catch (IOException e) { - LOG.error("Unable to write Container Key and " + - "Container Key Count data in Recon DB.", e); - return false; - } - return true; - } - - private boolean checkAndCallFlushToDB( - Map containerKeyMap) { - // if containerKeyMap more than entries, flush to DB and clear the map - if (null != containerKeyMap && containerKeyMap.size() >= - containerKeyFlushToDBMaxThreshold) { - return flushAndCommitContainerKeyInfoToDB(containerKeyMap, - Collections.emptyMap()); - } - return true; - } - - @Override - public String getTaskName() { - return "ContainerKeyMapperTask"; - } - - public Collection getTaskTables() { - List taskTables = new ArrayList<>(); - taskTables.add(KEY_TABLE); - taskTables.add(FILE_TABLE); - return taskTables; - } - - @Override - public TaskResult process(OMUpdateEventBatch events, - Map subTaskSeekPosMap) { - Iterator eventIterator = events.getIterator(); - int eventCount = 0; - final Collection taskTables = getTaskTables(); - - // In-memory maps for fast look up and batch write - // (HDDS-8580) containerKeyMap map is allowed to be used - // in "process" without batching since the maximum number of keys - // is bounded by delta limit configurations - - // (container, key) -> count - Map containerKeyMap = new HashMap<>(); - // containerId -> key count - Map containerKeyCountMap = new HashMap<>(); - // List of the deleted (container, key) pair's - List deletedKeyCountList = new ArrayList<>(); - long startTime = System.currentTimeMillis(); - while (eventIterator.hasNext()) { - OMDBUpdateEvent omdbUpdateEvent = eventIterator.next(); - // Filter event inside process method to avoid duping - if (!taskTables.contains(omdbUpdateEvent.getTable())) { - continue; - } - String updatedKey = omdbUpdateEvent.getKey(); - OmKeyInfo updatedKeyValue = omdbUpdateEvent.getValue(); - try { - switch (omdbUpdateEvent.getAction()) { - case PUT: - handlePutOMKeyEvent(updatedKey, updatedKeyValue, containerKeyMap, - containerKeyCountMap, deletedKeyCountList); - break; - - case DELETE: - handleDeleteOMKeyEvent(updatedKey, containerKeyMap, - containerKeyCountMap, deletedKeyCountList); - break; - - case UPDATE: - if (omdbUpdateEvent.getOldValue() != null) { - handleDeleteOMKeyEvent( - omdbUpdateEvent.getOldValue().getKeyName(), containerKeyMap, - containerKeyCountMap, deletedKeyCountList); - } else { - LOG.warn("Update event does not have the old Key Info for {}.", - updatedKey); - } - handlePutOMKeyEvent(updatedKey, updatedKeyValue, containerKeyMap, - containerKeyCountMap, deletedKeyCountList); - break; - - default: LOG.debug("Skipping DB update event : {}", - omdbUpdateEvent.getAction()); - } - eventCount++; - } catch (IOException e) { - LOG.error("Unexpected exception while updating key data : {} ", - updatedKey, e); - return buildTaskResult(false); - } - } - try { - writeToTheDB(containerKeyMap, containerKeyCountMap, deletedKeyCountList); - } catch (IOException e) { - LOG.error("Unable to write Container Key Prefix data in Recon DB.", e); - return buildTaskResult(false); - } - LOG.debug("{} successfully processed {} OM DB update event(s) in {} milliseconds.", - getTaskName(), eventCount, (System.currentTimeMillis() - startTime)); - return buildTaskResult(true); - } - - private void writeToTheDB(Map containerKeyMap, - Map containerKeyCountMap, - List deletedContainerKeyList) - throws IOException { - try (RDBBatchOperation rdbBatchOperation = new RDBBatchOperation()) { - containerKeyMap.keySet().forEach((ContainerKeyPrefix key) -> { - try { - reconContainerMetadataManager - .batchStoreContainerKeyMapping(rdbBatchOperation, key, - containerKeyMap.get(key)); - } catch (IOException e) { - LOG.error("Unable to write Container Key Prefix data in Recon DB.", - e); - } - }); - - - containerKeyCountMap.keySet().forEach((Long key) -> { - try { - reconContainerMetadataManager - .batchStoreContainerKeyCounts(rdbBatchOperation, key, - containerKeyCountMap.get(key)); - } catch (IOException e) { - LOG.error("Unable to write Container Key Prefix data in Recon DB.", - e); - } - }); - - deletedContainerKeyList.forEach((ContainerKeyPrefix key) -> { - try { - reconContainerMetadataManager - .batchDeleteContainerMapping(rdbBatchOperation, key); - } catch (IOException e) { - LOG.error("Unable to write Container Key Prefix data in Recon DB.", - e); - } - }); - - reconContainerMetadataManager.commitBatchOperation(rdbBatchOperation); - } - } - - /** - * Note to delete an OM Key and update the containerID -> no. of keys counts - * (we are preparing for batch deletion in these data structures). - * - * @param key key String. - * @param containerKeyMap we keep the added containerKeys in this map - * (in this batch) - * @param containerKeyCountMap we keep the containerKey counts in this map - * @param deletedContainerKeyList list of the deleted containerKeys - * @throws IOException If Unable to write to container DB. - */ - private void handleDeleteOMKeyEvent(String key, - Map - containerKeyMap, - Map containerKeyCountMap, - List - deletedContainerKeyList) - throws IOException { - - Set keysToBeDeleted = new HashSet<>(); - try (TableIterator> keyContainerIterator = - reconContainerMetadataManager.getKeyContainerTableIterator()) { - - // Check if we have keys in this container in the DB - keyContainerIterator.seek(KeyPrefixContainer.get(key)); - while (keyContainerIterator.hasNext()) { - Table.KeyValue keyValue = - keyContainerIterator.next(); - String keyPrefix = keyValue.getKey().getKeyPrefix(); - if (keyPrefix.equals(key)) { - if (keyValue.getKey().getContainerId() != -1) { - keysToBeDeleted.add(keyValue.getKey().toContainerKeyPrefix()); - } - } else { - break; - } - } - } - - // Check if we have keys in this container in our containerKeyMap - containerKeyMap.keySet() - .forEach((ContainerKeyPrefix containerKeyPrefix) -> { - String keyPrefix = containerKeyPrefix.getKeyPrefix(); - if (keyPrefix.equals(key)) { - keysToBeDeleted.add(containerKeyPrefix); - } - }); - - for (ContainerKeyPrefix containerKeyPrefix : keysToBeDeleted) { - deletedContainerKeyList.add(containerKeyPrefix); - // Remove the container-key prefix from the map if we previously added - // it in this batch (and now we delete it) - containerKeyMap.remove(containerKeyPrefix); - - // decrement count and update containerKeyCount. - Long containerID = containerKeyPrefix.getContainerId(); - long keyCount; - if (containerKeyCountMap.containsKey(containerID)) { - keyCount = containerKeyCountMap.get(containerID); - } else { - keyCount = reconContainerMetadataManager - .getKeyCountForContainer(containerID); - } - if (keyCount > 0) { - containerKeyCountMap.put(containerID, --keyCount); - } - } - } - - /** - * Note to add an OM key and update containerID -> no. of keys count. - * - * @param key key String - * @param omKeyInfo omKeyInfo value - * @param containerKeyMap we keep the added containerKeys in this map - * (in this batch) - * @param containerKeyCountMap we keep the containerKey counts in this map - * @param deletedContainerKeyList list of the deleted containerKeys - * @throws IOException if unable to write to recon DB. - */ - private void handlePutOMKeyEvent(String key, OmKeyInfo omKeyInfo, - Map - containerKeyMap, - Map containerKeyCountMap, - List - deletedContainerKeyList) - throws IOException { - long containerCountToIncrement = 0; - for (OmKeyLocationInfoGroup omKeyLocationInfoGroup : omKeyInfo - .getKeyLocationVersions()) { - long keyVersion = omKeyLocationInfoGroup.getVersion(); - for (OmKeyLocationInfo omKeyLocationInfo : omKeyLocationInfoGroup - .getLocationList()) { - long containerId = omKeyLocationInfo.getContainerID(); - ContainerKeyPrefix containerKeyPrefix = ContainerKeyPrefix.get( - containerId, key, keyVersion); - if (reconContainerMetadataManager.getCountForContainerKeyPrefix( - containerKeyPrefix) == 0 - && !containerKeyMap.containsKey(containerKeyPrefix)) { - // Save on writes. No need to save same container-key prefix - // mapping again. - containerKeyMap.put(containerKeyPrefix, 1); - // Remove the container-key prefix from the deleted list if we - // previously deleted it in this batch (and now we add it again) - deletedContainerKeyList.remove(containerKeyPrefix); - - - // check if container already exists and - // increment the count of containers if it does not exist - if (!reconContainerMetadataManager.doesContainerExists(containerId) - && !containerKeyCountMap.containsKey(containerId)) { - containerCountToIncrement++; - } - - // update the count of keys for the given containerID - long keyCount; - if (containerKeyCountMap.containsKey(containerId)) { - keyCount = containerKeyCountMap.get(containerId); - } else { - keyCount = reconContainerMetadataManager - .getKeyCountForContainer(containerId); - } - - // increment the count and update containerKeyCount. - // keyCount will be 0 if containerID is not found. So, there is no - // need to initialize keyCount for the first time. - containerKeyCountMap.put(containerId, ++keyCount); - } - } - } - - if (containerCountToIncrement > 0) { - reconContainerMetadataManager - .incrementContainerCountBy(containerCountToIncrement); - } - } - - /** - * Write an OM key to container DB and update containerID -> no. of keys - * count to the Global Stats table. - * - * @param key key String - * @param omKeyInfo omKeyInfo value - * @param containerKeyMap we keep the added containerKeys in this map - * to allow incremental batching to containerKeyTable - * @param containerKeyCountMap we keep the containerKey counts in this map - * to allow batching to containerKeyCountTable - * after reprocessing is done - * @throws IOException if unable to write to recon DB. - */ - private void handleKeyReprocess(String key, - OmKeyInfo omKeyInfo, - Map - containerKeyMap, - Map containerKeyCountMap) - throws IOException { - long containerCountToIncrement = 0; - for (OmKeyLocationInfoGroup omKeyLocationInfoGroup : omKeyInfo - .getKeyLocationVersions()) { - long keyVersion = omKeyLocationInfoGroup.getVersion(); - for (OmKeyLocationInfo omKeyLocationInfo : omKeyLocationInfoGroup - .getLocationList()) { - long containerId = omKeyLocationInfo.getContainerID(); - ContainerKeyPrefix containerKeyPrefix = ContainerKeyPrefix.get( - containerId, key, keyVersion); - if (reconContainerMetadataManager.getCountForContainerKeyPrefix( - containerKeyPrefix) == 0 - && !containerKeyMap.containsKey(containerKeyPrefix)) { - // Save on writes. No need to save same container-key prefix - // mapping again. - containerKeyMap.put(containerKeyPrefix, 1); - - // check if container already exists and - // if it exists, update the count of keys for the given containerID - // else, increment the count of containers and initialize keyCount - long keyCount; - if (containerKeyCountMap.containsKey(containerId)) { - keyCount = containerKeyCountMap.get(containerId); - } else { - containerCountToIncrement++; - keyCount = 0; - } - - // increment the count and update containerKeyCount. - containerKeyCountMap.put(containerId, ++keyCount); - } - } - } - - if (containerCountToIncrement > 0) { - reconContainerMetadataManager - .incrementContainerCountBy(containerCountToIncrement); - } - } - -} diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperTaskFSO.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperTaskFSO.java new file mode 100644 index 000000000000..5fcd90c46de8 --- /dev/null +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperTaskFSO.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.recon.tasks; + +import com.google.inject.Inject; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.ozone.om.OMMetadataManager; +import org.apache.hadoop.ozone.om.helpers.BucketLayout; +import org.apache.hadoop.ozone.recon.ReconServerConfigKeys; +import org.apache.hadoop.ozone.recon.spi.ReconContainerMetadataManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.util.Map; + +/** + * Task for processing ContainerKey mapping specifically for FSO buckets. + */ +public class ContainerKeyMapperTaskFSO implements ReconOmTask { + + private final ReconContainerMetadataManager reconContainerMetadataManager; + private final OzoneConfiguration ozoneConfiguration; + + @Inject + public ContainerKeyMapperTaskFSO(ReconContainerMetadataManager reconContainerMetadataManager, + OzoneConfiguration configuration) { + this.reconContainerMetadataManager = reconContainerMetadataManager; + this.ozoneConfiguration = configuration; + } + + @Override + public TaskResult reprocess(OMMetadataManager omMetadataManager) { + long containerKeyFlushToDBMaxThreshold = ozoneConfiguration.getLong( + ReconServerConfigKeys.OZONE_RECON_CONTAINER_KEY_FLUSH_TO_DB_MAX_THRESHOLD, + ReconServerConfigKeys.OZONE_RECON_CONTAINER_KEY_FLUSH_TO_DB_MAX_THRESHOLD_DEFAULT); + boolean result = ContainerKeyMapperHelper.reprocess( + omMetadataManager, reconContainerMetadataManager, + BucketLayout.FILE_SYSTEM_OPTIMIZED, getTaskName(), containerKeyFlushToDBMaxThreshold); + return buildTaskResult(result); + } + + @Override + public String getTaskName() { + return "ContainerKeyMapperTaskFSO"; + } + + @Override + public TaskResult process(OMUpdateEventBatch events, Map subTaskSeekPosMap) { + boolean result = ContainerKeyMapperHelper.process(events, "fileTable", reconContainerMetadataManager, getTaskName()); + return buildTaskResult(result); + } +} diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperTaskOBS.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperTaskOBS.java new file mode 100644 index 000000000000..e70596ff1c38 --- /dev/null +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperTaskOBS.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.recon.tasks; + +import com.google.inject.Inject; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.ozone.om.OMMetadataManager; +import org.apache.hadoop.ozone.om.helpers.BucketLayout; +import org.apache.hadoop.ozone.recon.ReconServerConfigKeys; +import org.apache.hadoop.ozone.recon.spi.ReconContainerMetadataManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.util.Map; + +/** + * Task for processing ContainerKey mapping specifically for OBS buckets. + */ +public class ContainerKeyMapperTaskOBS implements ReconOmTask { + + private final ReconContainerMetadataManager reconContainerMetadataManager; + private final OzoneConfiguration ozoneConfiguration; + + @Inject + public ContainerKeyMapperTaskOBS(ReconContainerMetadataManager reconContainerMetadataManager, + OzoneConfiguration configuration) { + this.reconContainerMetadataManager = reconContainerMetadataManager; + this.ozoneConfiguration = configuration; + } + + @Override + public TaskResult reprocess(OMMetadataManager omMetadataManager) { + long containerKeyFlushToDBMaxThreshold = ozoneConfiguration.getLong( + ReconServerConfigKeys.OZONE_RECON_CONTAINER_KEY_FLUSH_TO_DB_MAX_THRESHOLD, + ReconServerConfigKeys.OZONE_RECON_CONTAINER_KEY_FLUSH_TO_DB_MAX_THRESHOLD_DEFAULT); + boolean result = ContainerKeyMapperHelper.reprocess( + omMetadataManager, reconContainerMetadataManager, BucketLayout.OBJECT_STORE, getTaskName(), + containerKeyFlushToDBMaxThreshold); + return buildTaskResult(result); + } + + @Override + public String getTaskName() { + return "ContainerKeyMapperTaskOBS"; + } + + @Override + public TaskResult process(OMUpdateEventBatch events, Map subTaskSeekPosMap) { + boolean result = ContainerKeyMapperHelper.process(events, "keyTable", reconContainerMetadataManager, getTaskName()); + return buildTaskResult(result); + } +} diff --git a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestContainerEndpoint.java b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestContainerEndpoint.java index 9353f406a779..76566b6f9eac 100644 --- a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestContainerEndpoint.java +++ b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestContainerEndpoint.java @@ -99,7 +99,8 @@ import org.apache.hadoop.ozone.recon.spi.StorageContainerServiceProvider; import org.apache.hadoop.ozone.recon.spi.impl.OzoneManagerServiceProviderImpl; import org.apache.hadoop.ozone.recon.spi.impl.StorageContainerServiceProviderImpl; -import org.apache.hadoop.ozone.recon.tasks.ContainerKeyMapperTask; +import org.apache.hadoop.ozone.recon.tasks.ContainerKeyMapperTaskFSO; +import org.apache.hadoop.ozone.recon.tasks.ContainerKeyMapperTaskOBS; import org.apache.hadoop.ozone.recon.tasks.NSSummaryTaskWithFSO; import org.apache.ozone.recon.schema.ContainerSchemaDefinition.UnHealthyContainerStates; import org.apache.ozone.recon.schema.generated.tables.pojos.UnhealthyContainers; @@ -297,10 +298,13 @@ public void setUp() throws Exception { } private void reprocessContainerKeyMapper() { - ContainerKeyMapperTask containerKeyMapperTask = - new ContainerKeyMapperTask(reconContainerMetadataManager, - omConfiguration); - containerKeyMapperTask.reprocess(reconOMMetadataManager); + ContainerKeyMapperTaskOBS containerKeyMapperTaskOBS = + new ContainerKeyMapperTaskOBS(reconContainerMetadataManager, omConfiguration); + containerKeyMapperTaskOBS.reprocess(reconOMMetadataManager); + + ContainerKeyMapperTaskFSO containerKeyMapperTaskFSO = + new ContainerKeyMapperTaskFSO(reconContainerMetadataManager, omConfiguration); + containerKeyMapperTaskFSO.reprocess(reconOMMetadataManager); } private void setUpFSOData() throws IOException { diff --git a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestOmDBInsightEndPoint.java b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestOmDBInsightEndPoint.java index aea63b41000e..eb45c6abe847 100644 --- a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestOmDBInsightEndPoint.java +++ b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestOmDBInsightEndPoint.java @@ -82,7 +82,7 @@ import org.apache.hadoop.ozone.recon.spi.StorageContainerServiceProvider; import org.apache.hadoop.ozone.recon.spi.impl.OzoneManagerServiceProviderImpl; import org.apache.hadoop.ozone.recon.spi.impl.StorageContainerServiceProviderImpl; -import org.apache.hadoop.ozone.recon.tasks.ContainerKeyMapperTask; +import org.apache.hadoop.ozone.recon.tasks.ContainerKeyMapperTaskOBS; import org.apache.hadoop.ozone.recon.tasks.NSSummaryTaskWithFSO; import org.apache.hadoop.ozone.recon.tasks.NSSummaryTaskWithLegacy; import org.apache.hadoop.ozone.recon.tasks.NSSummaryTaskWithOBS; @@ -387,8 +387,8 @@ private void setUpOmData() throws Exception { when(tableMock.getName()).thenReturn("KeyTable"); when(omMetadataManagerMock.getKeyTable(getBucketLayout())) .thenReturn(tableMock); - ContainerKeyMapperTask containerKeyMapperTask = - new ContainerKeyMapperTask(reconContainerMetadataManager, + ContainerKeyMapperTaskOBS containerKeyMapperTask = + new ContainerKeyMapperTaskOBS(reconContainerMetadataManager, ozoneConfiguration); containerKeyMapperTask.reprocess(reconOMMetadataManager); diff --git a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestContainerKeyMapperTask.java b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestContainerKeyMapperTask.java index 36b335c1b46b..55d0022dad34 100644 --- a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestContainerKeyMapperTask.java +++ b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestContainerKeyMapperTask.java @@ -132,10 +132,10 @@ public void testKeyTableReprocess() throws Exception { VOLUME_NAME, Collections.singletonList(omKeyLocationInfoGroup)); - ContainerKeyMapperTask containerKeyMapperTask = - new ContainerKeyMapperTask(reconContainerMetadataManager, + ContainerKeyMapperTaskOBS containerKeyMapperTaskOBS = + new ContainerKeyMapperTaskOBS(reconContainerMetadataManager, omConfiguration); - containerKeyMapperTask.reprocess(reconOMMetadataManager); + containerKeyMapperTaskOBS.reprocess(reconOMMetadataManager); keyPrefixesForContainer = reconContainerMetadataManager.getKeyPrefixesForContainer(1); @@ -205,10 +205,10 @@ public void testFileTableReprocess() throws Exception { KEY_ONE_SIZE); // Reprocess container key mappings - ContainerKeyMapperTask containerKeyMapperTask = - new ContainerKeyMapperTask(reconContainerMetadataManager, + ContainerKeyMapperTaskFSO containerKeyMapperTaskFSO = + new ContainerKeyMapperTaskFSO(reconContainerMetadataManager, omConfiguration); - containerKeyMapperTask.reprocess(reconOMMetadataManager); + containerKeyMapperTaskFSO.reprocess(reconOMMetadataManager); // Check the key prefixes for container 1 keyPrefixesForContainer = @@ -314,10 +314,10 @@ public void testKeyTableProcess() throws IOException { add(keyEvent2); }}, 0L); - ContainerKeyMapperTask containerKeyMapperTask = - new ContainerKeyMapperTask(reconContainerMetadataManager, + ContainerKeyMapperTaskOBS containerKeyMapperTaskOBS = + new ContainerKeyMapperTaskOBS(reconContainerMetadataManager, omConfiguration); - containerKeyMapperTask.reprocess(reconOMMetadataManager); + containerKeyMapperTaskOBS.reprocess(reconOMMetadataManager); keyPrefixesForContainer = reconContainerMetadataManager .getKeyPrefixesForContainer(1); @@ -336,7 +336,7 @@ public void testKeyTableProcess() throws IOException { assertEquals(1, reconContainerMetadataManager.getKeyCountForContainer(3L)); // Process PUT & DELETE event. - containerKeyMapperTask.process(omUpdateEventBatch, Collections.emptyMap()); + containerKeyMapperTaskOBS.process(omUpdateEventBatch, Collections.emptyMap()); keyPrefixesForContainer = reconContainerMetadataManager .getKeyPrefixesForContainer(1); @@ -384,8 +384,8 @@ public void testFileTableProcess() throws Exception { new OmKeyLocationInfoGroup(0L, omKeyLocationInfoList); // Reprocess container key mappings - ContainerKeyMapperTask containerKeyMapperTask = - new ContainerKeyMapperTask(reconContainerMetadataManager, + ContainerKeyMapperTaskFSO containerKeyMapperTaskFSO = + new ContainerKeyMapperTaskFSO(reconContainerMetadataManager, omConfiguration); String bucket = BUCKET_NAME; @@ -427,7 +427,7 @@ public void testFileTableProcess() throws Exception { }, 0L); // Process PUT event for both the keys - containerKeyMapperTask.process(omUpdateEventBatch, Collections.emptyMap()); + containerKeyMapperTaskFSO.process(omUpdateEventBatch, Collections.emptyMap()); keyPrefixesForContainer = reconContainerMetadataManager .getKeyPrefixesForContainer(1); @@ -460,7 +460,7 @@ public void testFileTableProcess() throws Exception { }, 0L); // Process DELETE event for key2 - containerKeyMapperTask.process(omUpdateEventBatch2, Collections.emptyMap()); + containerKeyMapperTaskFSO.process(omUpdateEventBatch, Collections.emptyMap()); keyPrefixesForContainer = reconContainerMetadataManager .getKeyPrefixesForContainer(1); From 2e4aa4ef13d34ad18e1836936655cd9b3e46fac0 Mon Sep 17 00:00:00 2001 From: arafat Date: Tue, 4 Mar 2025 10:51:41 +0530 Subject: [PATCH 02/11] More changes --- .../ozone/recon/spi/impl/ReconDBProvider.java | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/ReconDBProvider.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/ReconDBProvider.java index e1f7dd21ea4b..c89c0ed34f63 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/ReconDBProvider.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/ReconDBProvider.java @@ -83,11 +83,16 @@ static void truncateTable(Table table) throws IOException { if (table == null) { return; } - try (TableIterator> - tableIterator = table.iterator()) { - while (tableIterator.hasNext()) { - KeyValue entry = tableIterator.next(); - table.delete(entry.getKey()); + try (TableIterator> iterator = table.iterator()) { + if (iterator.hasNext()) { + Object firstKey = iterator.next().getKey(); + Object lastKey = null; + while (iterator.hasNext()) { + lastKey = iterator.next().getKey(); + } + if (lastKey != null) { + table.deleteRange(firstKey, lastKey); // Efficient bulk deletion + } } } } From 2cbc5fbb2faafc69567f462014ba622f5c2454ee Mon Sep 17 00:00:00 2001 From: arafat Date: Tue, 4 Mar 2025 13:20:34 +0530 Subject: [PATCH 03/11] Fixed checkstyle issues --- .../ozone/recon/tasks/ContainerKeyMapperHelper.java | 2 -- .../hadoop/ozone/recon/tasks/ContainerKeyMapperTask.java | 0 .../ozone/recon/tasks/ContainerKeyMapperTaskFSO.java | 8 +++----- .../ozone/recon/tasks/ContainerKeyMapperTaskOBS.java | 5 +---- 4 files changed, 4 insertions(+), 11 deletions(-) delete mode 100644 hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperTask.java diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperHelper.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperHelper.java index 3559c8015b0e..66bcaf1374b5 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperHelper.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperHelper.java @@ -28,8 +28,6 @@ import java.util.List; import java.util.Map; import java.util.Set; -import org.apache.commons.lang3.tuple.ImmutablePair; -import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.hdds.utils.db.RDBBatchOperation; import org.apache.hadoop.hdds.utils.db.Table; import org.apache.hadoop.hdds.utils.db.TableIterator; diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperTask.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperTask.java deleted file mode 100644 index e69de29bb2d1..000000000000 diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperTaskFSO.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperTaskFSO.java index 5fcd90c46de8..fd8df1a1e4f4 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperTaskFSO.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperTaskFSO.java @@ -18,15 +18,12 @@ package org.apache.hadoop.ozone.recon.tasks; import com.google.inject.Inject; -import org.apache.commons.lang3.tuple.Pair; +import java.util.Map; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.ozone.om.OMMetadataManager; import org.apache.hadoop.ozone.om.helpers.BucketLayout; import org.apache.hadoop.ozone.recon.ReconServerConfigKeys; import org.apache.hadoop.ozone.recon.spi.ReconContainerMetadataManager; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import java.util.Map; /** * Task for processing ContainerKey mapping specifically for FSO buckets. @@ -61,7 +58,8 @@ public String getTaskName() { @Override public TaskResult process(OMUpdateEventBatch events, Map subTaskSeekPosMap) { - boolean result = ContainerKeyMapperHelper.process(events, "fileTable", reconContainerMetadataManager, getTaskName()); + boolean result = + ContainerKeyMapperHelper.process(events, "fileTable", reconContainerMetadataManager, getTaskName()); return buildTaskResult(result); } } diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperTaskOBS.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperTaskOBS.java index e70596ff1c38..178ee8b02867 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperTaskOBS.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperTaskOBS.java @@ -18,15 +18,12 @@ package org.apache.hadoop.ozone.recon.tasks; import com.google.inject.Inject; -import org.apache.commons.lang3.tuple.Pair; +import java.util.Map; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.ozone.om.OMMetadataManager; import org.apache.hadoop.ozone.om.helpers.BucketLayout; import org.apache.hadoop.ozone.recon.ReconServerConfigKeys; import org.apache.hadoop.ozone.recon.spi.ReconContainerMetadataManager; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import java.util.Map; /** * Task for processing ContainerKey mapping specifically for OBS buckets. From 8ac70df74d4b2d18ee401fb39f384edf5c5665df Mon Sep 17 00:00:00 2001 From: arafat Date: Tue, 4 Mar 2025 14:30:13 +0530 Subject: [PATCH 04/11] Fixed bug --- .../hadoop/ozone/recon/tasks/TestContainerKeyMapperTask.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestContainerKeyMapperTask.java b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestContainerKeyMapperTask.java index 55d0022dad34..fb31537ec7fb 100644 --- a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestContainerKeyMapperTask.java +++ b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestContainerKeyMapperTask.java @@ -460,7 +460,7 @@ public void testFileTableProcess() throws Exception { }, 0L); // Process DELETE event for key2 - containerKeyMapperTaskFSO.process(omUpdateEventBatch, Collections.emptyMap()); + containerKeyMapperTaskFSO.process(omUpdateEventBatch2, Collections.emptyMap()); keyPrefixesForContainer = reconContainerMetadataManager .getKeyPrefixesForContainer(1); From eb5c4fff4e6137b97b0f40ec26118a80696e9583 Mon Sep 17 00:00:00 2001 From: arafat Date: Tue, 4 Mar 2025 17:31:49 +0530 Subject: [PATCH 05/11] Added new changes --- .../hadoop/ozone/recon/ReconServerConfigKeys.java | 2 +- .../ozone/recon/tasks/ContainerKeyMapperHelper.java | 13 +++++++------ 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java index c3fd5bb3592b..ce4baa604790 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java @@ -106,7 +106,7 @@ public final class ReconServerConfigKeys { public static final String OZONE_RECON_TASK_THREAD_COUNT_KEY = "ozone.recon.task.thread.count"; - public static final int OZONE_RECON_TASK_THREAD_COUNT_DEFAULT = 5; + public static final int OZONE_RECON_TASK_THREAD_COUNT_DEFAULT = 8; public static final String OZONE_RECON_HTTP_AUTH_CONFIG_PREFIX = "ozone.recon.http.auth."; diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperHelper.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperHelper.java index 66bcaf1374b5..64d17c7fe39a 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperHelper.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperHelper.java @@ -66,14 +66,14 @@ public static void truncateTablesIfNeeded(ReconContainerMetadataManager reconCon // Perform table truncation reconContainerMetadataManager.reinitWithNewContainerDataFromOm(new HashMap<>()); LOG.info("Successfully truncated container key tables."); - } catch (IOException e) { + } catch (Exception e) { // Reset the flag so truncation can be retried ReconConstants.CONTAINER_KEY_TABLES_TRUNCATED.set(false); LOG.error("Error while truncating container key tables. Resetting flag.", e); throw new RuntimeException("Table truncation failed", e); } } else { - LOG.info("Container key tables already truncated by another task, waiting for truncation to complete."); + LOG.info("Container key tables already truncated by another task."); } } } @@ -114,16 +114,17 @@ public static boolean reprocess(OMMetadataManager omMetadataManager, } } - // Flush and commit changes + // Final flush and commit if (!flushAndCommitContainerKeyInfoToDB(containerKeyMap, containerKeyCountMap, reconContainerMetadataManager)) { LOG.error("Failed to flush Container Key data to DB for {}", taskName); return false; } - LOG.info("Completed 'reprocess' for {}. Processed {} keys.", taskName, omKeyCount); Instant end = Instant.now(); - long duration = Duration.between(start, end).toMillis(); - LOG.info("Total time: {} seconds.", (double) duration / 1000.0); + long durationMillis = Duration.between(start, end).toMillis(); + double durationSeconds = (double) durationMillis / 1000.0; + LOG.info("Completed 'reprocess' for {}. Processed {} keys in {} ms ({} seconds).", + taskName, omKeyCount, durationMillis, durationSeconds); } catch (IOException ioEx) { LOG.error("Error populating Container Key data for {} in Recon DB.", taskName, ioEx); From 2a164b93d5c7075866f887a175ae385469924a26 Mon Sep 17 00:00:00 2001 From: arafat Date: Wed, 5 Mar 2025 13:07:35 +0530 Subject: [PATCH 06/11] Changed log levels to debug --- .../recon/tasks/ContainerKeyMapperHelper.java | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperHelper.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperHelper.java index 64d17c7fe39a..1894bf8c34bf 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperHelper.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperHelper.java @@ -59,21 +59,22 @@ public abstract class ContainerKeyMapperHelper { * * @param reconContainerMetadataManager The metadata manager instance responsible for DB operations. */ - public static void truncateTablesIfNeeded(ReconContainerMetadataManager reconContainerMetadataManager) { + public static void truncateTablesIfNeeded(ReconContainerMetadataManager reconContainerMetadataManager, + String taskName) { synchronized (TRUNCATE_LOCK) { if (ReconConstants.CONTAINER_KEY_TABLES_TRUNCATED.compareAndSet(false, true)) { try { // Perform table truncation - reconContainerMetadataManager.reinitWithNewContainerDataFromOm(new HashMap<>()); - LOG.info("Successfully truncated container key tables."); + reconContainerMetadataManager.reinitWithNewContainerDataFromOm(Collections.emptyMap()); + LOG.debug("Successfully truncated container key tables."); } catch (Exception e) { // Reset the flag so truncation can be retried ReconConstants.CONTAINER_KEY_TABLES_TRUNCATED.set(false); - LOG.error("Error while truncating container key tables. Resetting flag.", e); + LOG.error("Error while truncating container key tables for task {}. Resetting flag.", taskName, e); throw new RuntimeException("Table truncation failed", e); } } else { - LOG.info("Container key tables already truncated by another task."); + LOG.debug("Container key tables already truncated by another task."); } } } @@ -88,11 +89,11 @@ public static boolean reprocess(OMMetadataManager omMetadataManager, Map containerKeyCountMap = new HashMap<>(); try { - LOG.info("Starting a 'reprocess' run for {}.", taskName); + LOG.debug("Starting a 'reprocess' run for {}.", taskName); Instant start = Instant.now(); // Ensure the tables are truncated only once - truncateTablesIfNeeded(reconContainerMetadataManager); + truncateTablesIfNeeded(reconContainerMetadataManager, taskName); // Get the appropriate table based on BucketLayout Table omKeyInfoTable = omMetadataManager.getKeyTable(bucketLayout); @@ -123,7 +124,7 @@ public static boolean reprocess(OMMetadataManager omMetadataManager, Instant end = Instant.now(); long durationMillis = Duration.between(start, end).toMillis(); double durationSeconds = (double) durationMillis / 1000.0; - LOG.info("Completed 'reprocess' for {}. Processed {} keys in {} ms ({} seconds).", + LOG.debug("Completed 'reprocess' for {}. Processed {} keys in {} ms ({} seconds).", taskName, omKeyCount, durationMillis, durationSeconds); } catch (IOException ioEx) { @@ -209,7 +210,7 @@ public static boolean process(OMUpdateEventBatch events, LOG.error("Unable to write Container Key Prefix data in Recon DB.", e); return false; } - LOG.info("{} successfully processed {} OM DB update event(s) in {} milliseconds.", + LOG.debug("{} successfully processed {} OM DB update event(s) in {} milliseconds.", taskName, eventCount, (System.currentTimeMillis() - startTime)); return true; } From cb8faae89a9f491045ab57ed0c103900b85d62f1 Mon Sep 17 00:00:00 2001 From: arafat Date: Mon, 10 Mar 2025 12:33:55 +0530 Subject: [PATCH 07/11] Fixed checkstyle and bugs --- .../ozone/recon/ReconControllerModule.java | 4 --- .../ozone/recon/spi/impl/ReconDBProvider.java | 15 +++------ .../recon/tasks/ContainerKeyMapperHelper.java | 31 ++++++------------- 3 files changed, 14 insertions(+), 36 deletions(-) diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconControllerModule.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconControllerModule.java index ad2d25a29d4e..b2cbab13deec 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconControllerModule.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconControllerModule.java @@ -58,8 +58,6 @@ import org.apache.hadoop.ozone.recon.spi.impl.ReconDBProvider; import org.apache.hadoop.ozone.recon.spi.impl.ReconNamespaceSummaryManagerImpl; import org.apache.hadoop.ozone.recon.spi.impl.StorageContainerServiceProviderImpl; -import org.apache.hadoop.ozone.recon.tasks.ContainerKeyMapperTaskFSO; -import org.apache.hadoop.ozone.recon.tasks.ContainerKeyMapperTaskOBS; import org.apache.hadoop.ozone.recon.tasks.FileSizeCountTaskFSO; import org.apache.hadoop.ozone.recon.tasks.FileSizeCountTaskOBS; import org.apache.hadoop.ozone.recon.tasks.NSSummaryTask; @@ -132,8 +130,6 @@ static class ReconOmTaskBindingModule extends AbstractModule { protected void configure() { Multibinder taskBinder = Multibinder.newSetBinder(binder(), ReconOmTask.class); - taskBinder.addBinding().to(ContainerKeyMapperTaskFSO.class); - taskBinder.addBinding().to(ContainerKeyMapperTaskOBS.class); taskBinder.addBinding().to(FileSizeCountTaskFSO.class); taskBinder.addBinding().to(FileSizeCountTaskOBS.class); taskBinder.addBinding().to(OmTableInsightTask.class); diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/ReconDBProvider.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/ReconDBProvider.java index c89c0ed34f63..f69bfe263cb8 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/ReconDBProvider.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/ReconDBProvider.java @@ -83,16 +83,11 @@ static void truncateTable(Table table) throws IOException { if (table == null) { return; } - try (TableIterator> iterator = table.iterator()) { - if (iterator.hasNext()) { - Object firstKey = iterator.next().getKey(); - Object lastKey = null; - while (iterator.hasNext()) { - lastKey = iterator.next().getKey(); - } - if (lastKey != null) { - table.deleteRange(firstKey, lastKey); // Efficient bulk deletion - } + try (TableIterator> + tableIterator = table.iterator()) { + while (tableIterator.hasNext()) { + KeyValue entry = tableIterator.next(); + table.delete(entry.getKey()); } } } diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperHelper.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperHelper.java index 1894bf8c34bf..e1cffc72ad0b 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperHelper.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperHelper.java @@ -340,15 +340,6 @@ private static void handleDeleteOMKeyEvent(String key, } } - /** - * Writes container key data to the database. - * - * @param containerKeyMap Map containing container key mappings. - * @param containerKeyCountMap Map containing container key counts. - * @param deletedContainerKeyList List of deleted container keys. - * @param reconContainerMetadataManager Recon metadata manager instance. - * @throws IOException If unable to write to the Recon DB. - */ private static void writeToTheDB(Map containerKeyMap, Map containerKeyCountMap, List deletedContainerKeyList, @@ -393,14 +384,18 @@ private static void writeToTheDB(Map containerKeyMa } /** - * Handles reprocessing of OM keys for container key mapping. + * Write an OM key to container DB and update containerID -> no. of keys + * count to the Global Stats table. * * @param key key String - * @param omKeyInfo OmKeyInfo value - * @param containerKeyMap map of (container, key) -> count - * @param containerKeyCountMap map of containerId -> key count + * @param omKeyInfo omKeyInfo value + * @param containerKeyMap we keep the added containerKeys in this map + * to allow incremental batching to containerKeyTable + * @param containerKeyCountMap we keep the containerKey counts in this map + * to allow batching to containerKeyCountTable + * after reprocessing is done * @param reconContainerMetadataManager Recon metadata manager instance - * @throws IOException if unable to write to Recon DB. + * @throws IOException if unable to write to recon DB. */ public static void handleKeyReprocess(String key, OmKeyInfo omKeyInfo, @@ -442,14 +437,6 @@ public static void handleKeyReprocess(String key, } } - /** - * Flushes and commits container key mappings to the database. - * - * @param containerKeyMap map of (container, key) -> count - * @param containerKeyCountMap map of containerId -> key count - * @param reconContainerMetadataManager Recon metadata manager instance - * @return true if the data was successfully flushed, false otherwise - */ public static boolean flushAndCommitContainerKeyInfoToDB( Map containerKeyMap, Map containerKeyCountMap, From f346a41cd650f8f46607c0a579e1f6ef183ae5a5 Mon Sep 17 00:00:00 2001 From: arafat Date: Wed, 12 Mar 2025 09:38:10 +0530 Subject: [PATCH 08/11] Fixed last review comments --- .../org/apache/hadoop/ozone/recon/ReconControllerModule.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconControllerModule.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconControllerModule.java index b2cbab13deec..ad2d25a29d4e 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconControllerModule.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconControllerModule.java @@ -58,6 +58,8 @@ import org.apache.hadoop.ozone.recon.spi.impl.ReconDBProvider; import org.apache.hadoop.ozone.recon.spi.impl.ReconNamespaceSummaryManagerImpl; import org.apache.hadoop.ozone.recon.spi.impl.StorageContainerServiceProviderImpl; +import org.apache.hadoop.ozone.recon.tasks.ContainerKeyMapperTaskFSO; +import org.apache.hadoop.ozone.recon.tasks.ContainerKeyMapperTaskOBS; import org.apache.hadoop.ozone.recon.tasks.FileSizeCountTaskFSO; import org.apache.hadoop.ozone.recon.tasks.FileSizeCountTaskOBS; import org.apache.hadoop.ozone.recon.tasks.NSSummaryTask; @@ -130,6 +132,8 @@ static class ReconOmTaskBindingModule extends AbstractModule { protected void configure() { Multibinder taskBinder = Multibinder.newSetBinder(binder(), ReconOmTask.class); + taskBinder.addBinding().to(ContainerKeyMapperTaskFSO.class); + taskBinder.addBinding().to(ContainerKeyMapperTaskOBS.class); taskBinder.addBinding().to(FileSizeCountTaskFSO.class); taskBinder.addBinding().to(FileSizeCountTaskOBS.class); taskBinder.addBinding().to(OmTableInsightTask.class); From 03bd42f49a20e663e5ac3892ee1e06855cf10cbe Mon Sep 17 00:00:00 2001 From: arafat Date: Wed, 12 Mar 2025 12:52:43 +0530 Subject: [PATCH 09/11] Fixed java doc issue --- .../java/org/apache/hadoop/ozone/recon/ReconConstants.java | 5 +++-- .../hadoop/ozone/recon/tasks/ContainerKeyMapperHelper.java | 2 +- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconConstants.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconConstants.java index 8e034c781769..2a831c189e67 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconConstants.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconConstants.java @@ -70,7 +70,8 @@ private ReconConstants() { public static final String RECON_ACCESS_METADATA_START_DATE = "startDate"; public static final String CONTAINER_COUNT = "CONTAINER_COUNT"; public static final String TOTAL_KEYS = "TOTAL_KEYS"; - public static final String TOTAL_USED_BYTES = "TOTAL_USED_BYTES"; + public static final String TOTAL_USED_BYTES = "TOTAL" + + "_USED_BYTES"; // 1125899906842624L = 1PB public static final long MAX_FILE_SIZE_UPPER_BOUND = 1125899906842624L; @@ -98,7 +99,7 @@ private ReconConstants() { public static final AtomicBoolean CONTAINER_KEY_TABLES_TRUNCATED = new AtomicBoolean(false); /** - * Resets the table-truncated flag for the given tables. This should be called once per reprocess cycle, + * Resets the table truncated flag for the given tables. This should be called once per reprocess cycle, * for example by the OM task controller, before the tasks run. */ public static void resetTableTruncatedFlags() { diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperHelper.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperHelper.java index e1cffc72ad0b..dfe7047196c8 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperHelper.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperHelper.java @@ -384,7 +384,7 @@ private static void writeToTheDB(Map containerKeyMa } /** - * Write an OM key to container DB and update containerID -> no. of keys + * Write an OM key to container DB and update containerID -> no. of keys * count to the Global Stats table. * * @param key key String From 75b2498cc45d331415ab76ebd0e78c10aa38604a Mon Sep 17 00:00:00 2001 From: arafat Date: Wed, 12 Mar 2025 12:53:46 +0530 Subject: [PATCH 10/11] Fixed java doc issue --- .../hadoop/ozone/recon/tasks/ContainerKeyMapperHelper.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperHelper.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperHelper.java index dfe7047196c8..7e5a02ff99a1 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperHelper.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperHelper.java @@ -216,7 +216,7 @@ public static boolean process(OMUpdateEventBatch events, } /** - * Note to add an OM key and update containerID -> no. of keys count. + * Note to add an OM key and update containerID -> no. of keys count. * * @param key key String * @param omKeyInfo omKeyInfo value @@ -276,7 +276,7 @@ private static void handlePutOMKeyEvent(String key, OmKeyInfo omKeyInfo, } /** - * Note to delete an OM Key and update the containerID -> no. of keys counts + * Note to delete an OM Key and update the containerID -> no. of keys counts * (we are preparing for batch deletion in these data structures). * * @param key key String. From 36d34ef116fbbf4209ab8266cc4d2e4caa2e4df7 Mon Sep 17 00:00:00 2001 From: arafat Date: Thu, 13 Mar 2025 00:05:53 +0530 Subject: [PATCH 11/11] Made minor changes --- .../apache/ozone/recon/schema/ContainerSchemaDefinition.java | 5 +++++ .../java/org/apache/hadoop/ozone/recon/ReconConstants.java | 3 +-- .../apache/hadoop/ozone/recon/spi/impl/ReconDBProvider.java | 2 +- 3 files changed, 7 insertions(+), 3 deletions(-) diff --git a/hadoop-ozone/recon-codegen/src/main/java/org/apache/ozone/recon/schema/ContainerSchemaDefinition.java b/hadoop-ozone/recon-codegen/src/main/java/org/apache/ozone/recon/schema/ContainerSchemaDefinition.java index 6107c1eb3ada..0fffeb9edeff 100644 --- a/hadoop-ozone/recon-codegen/src/main/java/org/apache/ozone/recon/schema/ContainerSchemaDefinition.java +++ b/hadoop-ozone/recon-codegen/src/main/java/org/apache/ozone/recon/schema/ContainerSchemaDefinition.java @@ -29,6 +29,8 @@ import org.jooq.DSLContext; import org.jooq.impl.DSL; import org.jooq.impl.SQLDataType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Class used to create tables that are required for tracking containers. @@ -38,6 +40,8 @@ public class ContainerSchemaDefinition implements ReconSchemaDefinition { public static final String UNHEALTHY_CONTAINERS_TABLE_NAME = "UNHEALTHY_CONTAINERS"; + private static final Logger LOG = + LoggerFactory.getLogger(ContainerSchemaDefinition.class); /** * ENUM describing the allowed container states which can be stored in the @@ -68,6 +72,7 @@ public void initializeSchema() throws SQLException { Connection conn = dataSource.getConnection(); dslContext = DSL.using(conn); if (!TABLE_EXISTS_CHECK.test(conn, UNHEALTHY_CONTAINERS_TABLE_NAME)) { + LOG.info("UNHEALTHY_CONTAINERS is missing creating new one."); createUnhealthyContainersTable(); } } diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconConstants.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconConstants.java index 2a831c189e67..6927d77c0331 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconConstants.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconConstants.java @@ -70,8 +70,7 @@ private ReconConstants() { public static final String RECON_ACCESS_METADATA_START_DATE = "startDate"; public static final String CONTAINER_COUNT = "CONTAINER_COUNT"; public static final String TOTAL_KEYS = "TOTAL_KEYS"; - public static final String TOTAL_USED_BYTES = "TOTAL" + - "_USED_BYTES"; + public static final String TOTAL_USED_BYTES = "TOTAL_USED_BYTES"; // 1125899906842624L = 1PB public static final long MAX_FILE_SIZE_UPPER_BOUND = 1125899906842624L; diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/ReconDBProvider.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/ReconDBProvider.java index f69bfe263cb8..e1f7dd21ea4b 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/ReconDBProvider.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/ReconDBProvider.java @@ -84,7 +84,7 @@ static void truncateTable(Table table) throws IOException { return; } try (TableIterator> - tableIterator = table.iterator()) { + tableIterator = table.iterator()) { while (tableIterator.hasNext()) { KeyValue entry = tableIterator.next(); table.delete(entry.getKey());