diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml index 658a0d67d5ab..aed31414abfd 100644 --- a/hadoop-hdds/common/src/main/resources/ozone-default.xml +++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml @@ -4318,6 +4318,43 @@ recon rocks DB containerKeyTable + + + ozone.recon.filesizecount.flush.db.max.threshold + 200000 + OZONE, RECON, PERFORMANCE + + Maximum threshold number of entries to hold in memory for File Size Count task in hashmap before flushing to + recon derby DB + + + + + ozone.recon.task.reprocess.max.iterators + 5 + OZONE, RECON, PERFORMANCE + + Maximum number of iterator threads to use for parallel table iteration during reprocess + + + + + ozone.recon.task.reprocess.max.workers + 20 + OZONE, RECON, PERFORMANCE + + Maximum number of worker threads to use for parallel table processing during reprocess + + + + + ozone.recon.task.reprocess.max.keys.in.memory + 2000 + OZONE, RECON, PERFORMANCE + + Maximum number of keys to batch in memory before handing to worker threads during parallel reprocess + + ozone.recon.heatmap.provider diff --git a/hadoop-ozone/integration-test-recon/src/test/java/org/apache/hadoop/ozone/recon/TestReconAndAdminContainerCLI.java b/hadoop-ozone/integration-test-recon/src/test/java/org/apache/hadoop/ozone/recon/TestReconAndAdminContainerCLI.java index 70be91b78624..52998a45c267 100644 --- a/hadoop-ozone/integration-test-recon/src/test/java/org/apache/hadoop/ozone/recon/TestReconAndAdminContainerCLI.java +++ b/hadoop-ozone/integration-test-recon/src/test/java/org/apache/hadoop/ozone/recon/TestReconAndAdminContainerCLI.java @@ -457,6 +457,8 @@ private static void setupConfigKeys() { 1, SECONDS); CONF.setTimeDuration(HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT, 0, SECONDS); + // Configure multiple task threads for concurrent task execution + CONF.setInt("ozone.recon.task.thread.count", 6); CONF.set(OzoneConfigKeys.OZONE_SCM_CLOSE_CONTAINER_WAIT_DURATION, "2s"); CONF.set(ScmConfigKeys.OZONE_SCM_PIPELINE_SCRUB_INTERVAL, "2s"); CONF.set(ScmConfigKeys.OZONE_SCM_PIPELINE_DESTROY_TIMEOUT, "5s"); diff --git a/hadoop-ozone/integration-test-recon/src/test/java/org/apache/hadoop/ozone/recon/TestReconContainerEndpoint.java b/hadoop-ozone/integration-test-recon/src/test/java/org/apache/hadoop/ozone/recon/TestReconContainerEndpoint.java index b278b14bc06e..d1d804f3a3a7 100644 --- a/hadoop-ozone/integration-test-recon/src/test/java/org/apache/hadoop/ozone/recon/TestReconContainerEndpoint.java +++ b/hadoop-ozone/integration-test-recon/src/test/java/org/apache/hadoop/ozone/recon/TestReconContainerEndpoint.java @@ -65,6 +65,8 @@ public void init() throws Exception { OzoneConfiguration conf = new OzoneConfiguration(); conf.set(OMConfigKeys.OZONE_DEFAULT_BUCKET_LAYOUT, OMConfigKeys.OZONE_BUCKET_LAYOUT_FILE_SYSTEM_OPTIMIZED); + // Configure multiple task threads for concurrent task execution + conf.setInt("ozone.recon.task.thread.count", 6); recon = new ReconService(conf); cluster = MiniOzoneCluster.newBuilder(conf) .setNumDatanodes(3) diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconConstants.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconConstants.java index 396beb9e3fa3..216610bde673 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconConstants.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconConstants.java @@ -93,7 +93,9 @@ public final class ReconConstants { // For file-size count reprocessing: ensure only one task truncates the table. public static final AtomicBoolean FILE_SIZE_COUNT_TABLE_TRUNCATED = new AtomicBoolean(false); - public static final AtomicBoolean CONTAINER_KEY_TABLES_TRUNCATED = new AtomicBoolean(false); + // For container key mapper reprocessing: ensure only one task performs initialization + // (truncates tables + clears shared map) + public static final AtomicBoolean CONTAINER_KEY_MAPPER_INITIALIZED = new AtomicBoolean(false); private ReconConstants() { // Never Constructed @@ -105,6 +107,6 @@ private ReconConstants() { */ public static void resetTableTruncatedFlags() { FILE_SIZE_COUNT_TABLE_TRUNCATED.set(false); - CONTAINER_KEY_TABLES_TRUNCATED.set(false); + CONTAINER_KEY_MAPPER_INITIALIZED.set(false); } } diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java index d3f684238712..ba357f4ba145 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java @@ -162,6 +162,28 @@ public final class ReconServerConfigKeys { public static final long OZONE_RECON_CONTAINER_KEY_FLUSH_TO_DB_MAX_THRESHOLD_DEFAULT = 150 * 1000L; + public static final String + OZONE_RECON_FILESIZECOUNT_FLUSH_TO_DB_MAX_THRESHOLD = + "ozone.recon.filesizecount.flush.db.max.threshold"; + + public static final long + OZONE_RECON_FILESIZECOUNT_FLUSH_TO_DB_MAX_THRESHOLD_DEFAULT = 200 * 1000L; + + public static final String + OZONE_RECON_TASK_REPROCESS_MAX_ITERATORS = "ozone.recon.task.reprocess.max.iterators"; + + public static final int OZONE_RECON_TASK_REPROCESS_MAX_ITERATORS_DEFAULT = 5; + + public static final String + OZONE_RECON_TASK_REPROCESS_MAX_WORKERS = "ozone.recon.task.reprocess.max.workers"; + + public static final int OZONE_RECON_TASK_REPROCESS_MAX_WORKERS_DEFAULT = 20; + + public static final String + OZONE_RECON_TASK_REPROCESS_MAX_KEYS_IN_MEMORY = "ozone.recon.task.reprocess.max.keys.in.memory"; + + public static final int OZONE_RECON_TASK_REPROCESS_MAX_KEYS_IN_MEMORY_DEFAULT = 2000; + public static final String OZONE_RECON_SCM_SNAPSHOT_TASK_INTERVAL_DELAY = "ozone.recon.scm.snapshot.task.interval.delay"; diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java index 939a3b16df89..ebd49cb46dc4 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java @@ -753,11 +753,11 @@ public boolean syncDataFromOM() { fullSnapshotReconTaskUpdater.updateDetails(); // Update the current OM metadata manager in task controller reconTaskController.updateOMMetadataManager(omMetadataManager); - + // Pass on DB update events to tasks that are listening. reconTaskController.consumeOMEvents(new OMUpdateEventBatch( omdbUpdatesHandler.getEvents(), omdbUpdatesHandler.getLatestSequenceNumber()), omMetadataManager); - + // Check if task reinitialization is needed due to buffer overflow or task failures boolean bufferOverflowed = reconTaskController.hasEventBufferOverflowed(); boolean tasksFailed = reconTaskController.hasTasksFailed(); diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperHelper.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperHelper.java index e786aa282a8c..626376ac09a9 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperHelper.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperHelper.java @@ -17,7 +17,9 @@ package org.apache.hadoop.ozone.recon.tasks; +import com.google.common.annotations.VisibleForTesting; import java.io.IOException; +import java.io.UncheckedIOException; import java.time.Duration; import java.time.Instant; import java.util.ArrayList; @@ -28,7 +30,12 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Function; import org.apache.hadoop.hdds.utils.db.RDBBatchOperation; +import org.apache.hadoop.hdds.utils.db.StringCodec; import org.apache.hadoop.hdds.utils.db.Table; import org.apache.hadoop.hdds.utils.db.TableIterator; import org.apache.hadoop.ozone.om.OMMetadataManager; @@ -40,6 +47,7 @@ import org.apache.hadoop.ozone.recon.api.types.ContainerKeyPrefix; import org.apache.hadoop.ozone.recon.api.types.KeyPrefixContainer; import org.apache.hadoop.ozone.recon.spi.ReconContainerMetadataManager; +import org.apache.hadoop.ozone.recon.tasks.util.ParallelTableIteratorOperation; import org.apache.hadoop.util.Time; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -51,99 +59,170 @@ public abstract class ContainerKeyMapperHelper { private static final Logger LOG = LoggerFactory.getLogger(ContainerKeyMapperHelper.class); - // Static lock to guard table truncation. - private static final Object TRUNCATE_LOCK = new Object(); + // Single lock to guard all initialization operations (table truncation + map clearing) + private static final Object INITIALIZATION_LOCK = new Object(); /** - * Ensures that the container key tables are truncated only once before reprocessing. - * Uses an AtomicBoolean to track if truncation has already been performed. - * - * @param reconContainerMetadataManager The metadata manager instance responsible for DB operations. + * Reference counter to track how many tasks are actively using the shared map. + * Initialized to 2 (FSO + OBS tasks) during initialization. + * Each task decrements on completion. Last task (count reaches 0) clears the shared map. + */ + private static final AtomicInteger ACTIVE_TASK_COUNT = new AtomicInteger(0); + + /** + * SHARED across all tasks (FSO + OBS) for cross-task synchronization. + * Maps: ContainerId -> AtomicLong (key count in that container) + * Purpose: Prevents data corruption when FSO and OBS tasks run concurrently + * and both write to the same container IDs. Both tasks accumulate into this + * single shared map, ensuring final DB write contains complete totals. + */ + private static final Map SHARED_CONTAINER_KEY_COUNT_MAP = new ConcurrentHashMap<>(); + + /** + * Performs one-time initialization for Container Key Mapper tasks. + * This includes: + * 1. Truncating container key tables in DB + * 2. Clearing the shared container count map + * + * This method is called by both FSO and OBS tasks at the start of reprocess. + * Only the first task to call this will perform initialization. + * + * @param reconContainerMetadataManager The metadata manager for DB operations + * @param taskName Name of the task calling this method (for logging) + * @throws RuntimeException if initialization fails */ - public static void truncateTablesIfNeeded(ReconContainerMetadataManager reconContainerMetadataManager, - String taskName) { - synchronized (TRUNCATE_LOCK) { - if (ReconConstants.CONTAINER_KEY_TABLES_TRUNCATED.compareAndSet(false, true)) { + private static void initializeContainerKeyMapperIfNeeded( + ReconContainerMetadataManager reconContainerMetadataManager, + String taskName) { + + synchronized (INITIALIZATION_LOCK) { + ACTIVE_TASK_COUNT.incrementAndGet(); + // Check if already initialized by another task + if (ReconConstants.CONTAINER_KEY_MAPPER_INITIALIZED.compareAndSet(false, true)) { try { - // Perform table truncation + // Step 1: Truncate tables reconContainerMetadataManager.reinitWithNewContainerDataFromOm(Collections.emptyMap()); - LOG.debug("Successfully truncated container key tables."); + + // Step 2: Clear shared map + SHARED_CONTAINER_KEY_COUNT_MAP.clear(); + } catch (Exception e) { - // Reset the flag so truncation can be retried - ReconConstants.CONTAINER_KEY_TABLES_TRUNCATED.set(false); - LOG.error("Error while truncating container key tables for task {}. Resetting flag.", taskName, e); - throw new RuntimeException("Table truncation failed", e); + // CRITICAL: Decrement counter and reset flag so another task can retry + ACTIVE_TASK_COUNT.decrementAndGet(); + ReconConstants.CONTAINER_KEY_MAPPER_INITIALIZED.set(false); + LOG.error("{}: Container Key Mapper initialization failed. Resetting flag for retry.", taskName, e); + throw new RuntimeException("Container Key Mapper initialization failed", e); } } else { - LOG.debug("Container key tables already truncated by another task."); + LOG.debug("{}: Container Key Mapper already initialized by another task", taskName); } } } + @SuppressWarnings("checkstyle:ParameterNumber") public static boolean reprocess(OMMetadataManager omMetadataManager, ReconContainerMetadataManager reconContainerMetadataManager, BucketLayout bucketLayout, String taskName, - long containerKeyFlushToDBMaxThreshold) { - long omKeyCount = 0; - Map containerKeyMap = new HashMap<>(); - Map containerKeyCountMap = new HashMap<>(); - + long containerKeyFlushToDBMaxThreshold, + int maxIterators, + int maxWorkers, + int maxKeysInMemory) { try { - LOG.debug("Starting a 'reprocess' run for {}.", taskName); + LOG.info("{}: Starting reprocess for bucket layout {}", taskName, bucketLayout); Instant start = Instant.now(); - // Ensure the tables are truncated only once - truncateTablesIfNeeded(reconContainerMetadataManager, taskName); + // Perform one-time initialization (truncate tables + clear shared map) + initializeContainerKeyMapperIfNeeded(reconContainerMetadataManager, taskName); - // Get the appropriate table based on BucketLayout Table omKeyInfoTable = omMetadataManager.getKeyTable(bucketLayout); - // Iterate through the table and process keys - try (TableIterator> keyIter = omKeyInfoTable.iterator()) { - while (keyIter.hasNext()) { - Table.KeyValue kv = keyIter.next(); - handleKeyReprocess(kv.getKey(), kv.getValue(), containerKeyMap, containerKeyCountMap, + // Divide threshold by worker count so each worker flushes independently + final long perWorkerThreshold = Math.max(1, containerKeyFlushToDBMaxThreshold / maxWorkers); + + // Map thread IDs to worker-specific local maps for lockless updates + Map> allLocalMaps = new ConcurrentHashMap<>(); + + Function, Void> kvOperation = kv -> { + try { + // Get or create this worker's private local map using thread ID + Map containerKeyPrefixMap = allLocalMaps.computeIfAbsent( + Thread.currentThread().getId(), k -> new ConcurrentHashMap<>()); + + handleKeyReprocess(kv.getKey(), kv.getValue(), containerKeyPrefixMap, SHARED_CONTAINER_KEY_COUNT_MAP, reconContainerMetadataManager); - omKeyCount++; - // Check and flush data if it reaches the batch threshold - if (!checkAndCallFlushToDB(containerKeyMap, containerKeyFlushToDBMaxThreshold, + // Flush this worker's map when it reaches threshold + if (containerKeyPrefixMap.size() >= perWorkerThreshold) { + if (!flushAndCommitContainerKeyInfoToDB(containerKeyPrefixMap, Collections.emptyMap(), + reconContainerMetadataManager)) { + throw new UncheckedIOException(new IOException("Unable to flush containerKey information to the DB")); + } + } + return null; + } catch (IOException e) { + throw new UncheckedIOException(e); + } + }; + + try (ParallelTableIteratorOperation keyIter = + new ParallelTableIteratorOperation<>(omMetadataManager, omKeyInfoTable, + StringCodec.get(), maxIterators, maxWorkers, maxKeysInMemory, perWorkerThreshold)) { + keyIter.performTaskOnTableVals(taskName, null, null, kvOperation); + } + + // Final flush: Write remaining entries from all worker local maps to DB + for (Map containerKeyPrefixMap : allLocalMaps.values()) { + if (!containerKeyPrefixMap.isEmpty()) { + if (!flushAndCommitContainerKeyInfoToDB(containerKeyPrefixMap, Collections.emptyMap(), reconContainerMetadataManager)) { - LOG.error("Failed to flush container key data for {}", taskName); + LOG.error("Failed to flush worker local map for {}", taskName); return false; } } } - // Final flush and commit - if (!flushAndCommitContainerKeyInfoToDB(containerKeyMap, containerKeyCountMap, reconContainerMetadataManager)) { - LOG.error("Failed to flush Container Key data to DB for {}", taskName); - return false; + // Decrement active task counter + int remainingTasks = ACTIVE_TASK_COUNT.decrementAndGet(); + LOG.info("{}: Task completed. Remaining active tasks: {}", taskName, remainingTasks); + + // Only last task flushes shared map and writes container count + if (remainingTasks == 0) { + synchronized (INITIALIZATION_LOCK) { + // Capture total container count from shared map + long totalContainers = SHARED_CONTAINER_KEY_COUNT_MAP.size(); + + // Flush shared container count map + if (!flushAndCommitContainerKeyInfoToDB(Collections.emptyMap(), SHARED_CONTAINER_KEY_COUNT_MAP, + reconContainerMetadataManager)) { + LOG.error("Failed to flush shared container count map for {}", taskName); + return false; + } + + // Write total container count once at the end + if (totalContainers > 0) { + reconContainerMetadataManager.incrementContainerCountBy(totalContainers); + } + + // Clean up shared resources + SHARED_CONTAINER_KEY_COUNT_MAP.clear(); + ReconConstants.CONTAINER_KEY_MAPPER_INITIALIZED.set(false); + LOG.info("{}: Last task completed. Cleared shared map and reset initialization flag.", taskName); + } } Instant end = Instant.now(); long durationMillis = Duration.between(start, end).toMillis(); double durationSeconds = (double) durationMillis / 1000.0; - LOG.debug("Completed 'reprocess' for {}. Processed {} keys in {} ms ({} seconds).", - taskName, omKeyCount, durationMillis, durationSeconds); - } catch (IOException ioEx) { - LOG.error("Error populating Container Key data for {} in Recon DB.", taskName, ioEx); + LOG.info("{}: Reprocess completed in {} sec", taskName, durationSeconds); + } catch (Exception ex) { + LOG.error("Error populating Container Key data for {} in Recon DB.", taskName, ex); return false; } return true; } - private static boolean checkAndCallFlushToDB(Map containerKeyMap, - long containerKeyFlushToDBMaxThreshold, - ReconContainerMetadataManager reconContainerMetadataManager) { - if (containerKeyMap.size() >= containerKeyFlushToDBMaxThreshold) { - return flushAndCommitContainerKeyInfoToDB(containerKeyMap, Collections.emptyMap(), reconContainerMetadataManager); - } - return true; - } - public static boolean process(OMUpdateEventBatch events, String tableName, ReconContainerMetadataManager reconContainerMetadataManager, @@ -152,14 +231,14 @@ public static boolean process(OMUpdateEventBatch events, int eventCount = 0; // In-memory maps for fast look up and batch write - // (HDDS-8580) containerKeyMap map is allowed to be used + // (HDDS-8580) localContainerKeyMap map is allowed to be used // in "process" without batching since the maximum number of keys // is bounded by delta limit configurations - // (container, key) -> count - Map containerKeyMap = new HashMap<>(); - // containerId -> key count - Map containerKeyCountMap = new HashMap<>(); + // Local map: (container, key) -> count (per event batch) + Map localContainerKeyMap = new HashMap<>(); + // Local map: containerId -> key count (per event batch) + Map localContainerKeyCountMap = new HashMap<>(); // List of the deleted (container, key) pair's List deletedKeyCountList = new ArrayList<>(); long startTime = Time.monotonicNow(); @@ -175,25 +254,25 @@ public static boolean process(OMUpdateEventBatch events, try { switch (omdbUpdateEvent.getAction()) { case PUT: - handlePutOMKeyEvent(updatedKey, updatedKeyValue, containerKeyMap, - containerKeyCountMap, deletedKeyCountList, reconContainerMetadataManager); + handlePutOMKeyEvent(updatedKey, updatedKeyValue, localContainerKeyMap, + localContainerKeyCountMap, deletedKeyCountList, reconContainerMetadataManager); break; case DELETE: - handleDeleteOMKeyEvent(updatedKey, containerKeyMap, - containerKeyCountMap, deletedKeyCountList, reconContainerMetadataManager); + handleDeleteOMKeyEvent(updatedKey, localContainerKeyMap, + localContainerKeyCountMap, deletedKeyCountList, reconContainerMetadataManager); break; case UPDATE: if (omdbUpdateEvent.getOldValue() != null) { handleDeleteOMKeyEvent( - omdbUpdateEvent.getOldValue().getKeyName(), containerKeyMap, - containerKeyCountMap, deletedKeyCountList, reconContainerMetadataManager); + omdbUpdateEvent.getOldValue().getKeyName(), localContainerKeyMap, + localContainerKeyCountMap, deletedKeyCountList, reconContainerMetadataManager); } else { LOG.warn("Update event does not have the old Key Info for {}.", updatedKey); } - handlePutOMKeyEvent(updatedKey, updatedKeyValue, containerKeyMap, - containerKeyCountMap, deletedKeyCountList, reconContainerMetadataManager); + handlePutOMKeyEvent(updatedKey, updatedKeyValue, localContainerKeyMap, + localContainerKeyCountMap, deletedKeyCountList, reconContainerMetadataManager); break; default: @@ -206,7 +285,11 @@ public static boolean process(OMUpdateEventBatch events, } } try { - writeToTheDB(containerKeyMap, containerKeyCountMap, deletedKeyCountList, reconContainerMetadataManager); + // Convert local Long map to AtomicLong map for writeToTheDB compatibility + Map localContainerKeyCountMapAtomic = new ConcurrentHashMap<>(); + localContainerKeyCountMap.forEach((k, v) -> localContainerKeyCountMapAtomic.put(k, new AtomicLong(v))); + writeToTheDB(localContainerKeyMap, localContainerKeyCountMapAtomic, deletedKeyCountList, + reconContainerMetadataManager); } catch (IOException e) { LOG.error("Unable to write Container Key Prefix data in Recon DB.", e); return false; @@ -341,29 +424,30 @@ private static void handleDeleteOMKeyEvent(String key, } } - private static void writeToTheDB(Map containerKeyMap, - Map containerKeyCountMap, + private static void writeToTheDB(Map localContainerKeyMap, + Map containerKeyCountMap, List deletedContainerKeyList, ReconContainerMetadataManager reconContainerMetadataManager) throws IOException { try (RDBBatchOperation rdbBatchOperation = new RDBBatchOperation()) { - // Write container key mappings - containerKeyMap.keySet().forEach((ContainerKeyPrefix key) -> { + // Write container key mappings (local per-task data) + localContainerKeyMap.keySet().forEach((ContainerKeyPrefix key) -> { try { reconContainerMetadataManager.batchStoreContainerKeyMapping( - rdbBatchOperation, key, containerKeyMap.get(key)); + rdbBatchOperation, key, localContainerKeyMap.get(key)); } catch (IOException e) { LOG.error("Unable to write Container Key Prefix data in Recon DB.", e); } }); - // Write container key count mappings + // Write container key count mappings (can be local or shared depending on caller) containerKeyCountMap.keySet().forEach((Long key) -> { try { + long count = containerKeyCountMap.get(key).get(); // Get value from AtomicLong reconContainerMetadataManager.batchStoreContainerKeyCounts( - rdbBatchOperation, key, containerKeyCountMap.get(key)); + rdbBatchOperation, key, count); } catch (IOException e) { LOG.error("Unable to write Container Key Count data in Recon DB.", e); } @@ -390,66 +474,52 @@ private static void writeToTheDB(Map containerKeyMa * * @param key key String * @param omKeyInfo omKeyInfo value - * @param containerKeyMap we keep the added containerKeys in this map - * to allow incremental batching to containerKeyTable - * @param containerKeyCountMap we keep the containerKey counts in this map - * to allow batching to containerKeyCountTable - * after reprocessing is done + * @param localContainerKeyMap Local per-task map for ContainerKeyPrefix mappings + * (cleared on flush, not shared between tasks) + * @param sharedContainerKeyCountMap Shared cross-task map for container counts + * (FSO + OBS both update this, uses AtomicLong for thread safety) * @param reconContainerMetadataManager Recon metadata manager instance * @throws IOException if unable to write to recon DB. */ public static void handleKeyReprocess(String key, OmKeyInfo omKeyInfo, - Map containerKeyMap, - Map containerKeyCountMap, + Map localContainerKeyMap, + Map sharedContainerKeyCountMap, ReconContainerMetadataManager reconContainerMetadataManager) throws IOException { - long containerCountToIncrement = 0; - for (OmKeyLocationInfoGroup omKeyLocationInfoGroup : omKeyInfo.getKeyLocationVersions()) { long keyVersion = omKeyLocationInfoGroup.getVersion(); for (OmKeyLocationInfo omKeyLocationInfo : omKeyLocationInfoGroup.getLocationList()) { long containerId = omKeyLocationInfo.getContainerID(); ContainerKeyPrefix containerKeyPrefix = ContainerKeyPrefix.get(containerId, key, keyVersion); - if (reconContainerMetadataManager.getCountForContainerKeyPrefix(containerKeyPrefix) == 0 - && !containerKeyMap.containsKey(containerKeyPrefix)) { + // During reprocess, tables are empty so skip DB lookup - just check in-memory map + if (!localContainerKeyMap.containsKey(containerKeyPrefix)) { // Save on writes. No need to save same container-key prefix mapping again. - containerKeyMap.put(containerKeyPrefix, 1); + localContainerKeyMap.put(containerKeyPrefix, 1); - // Check if container already exists; if not, increment the count - if (!reconContainerMetadataManager.doesContainerExists(containerId) - && !containerKeyCountMap.containsKey(containerId)) { - containerCountToIncrement++; - } - - // Update the count of keys for the given containerID - long keyCount = containerKeyCountMap.getOrDefault(containerId, - reconContainerMetadataManager.getKeyCountForContainer(containerId)); - - containerKeyCountMap.put(containerId, keyCount + 1); + // Thread-safe increment using computeIfAbsent (cross-task safe: FSO + OBS) + sharedContainerKeyCountMap.computeIfAbsent(containerId, k -> new AtomicLong(0)) + .incrementAndGet(); } } } - - if (containerCountToIncrement > 0) { - reconContainerMetadataManager.incrementContainerCountBy(containerCountToIncrement); - } + // Container count will be written once at the end of reprocess, not here (Derby optimization) } public static boolean flushAndCommitContainerKeyInfoToDB( - Map containerKeyMap, - Map containerKeyCountMap, + Map localContainerKeyMap, + Map sharedContainerKeyCountMap, ReconContainerMetadataManager reconContainerMetadataManager) { try { // No deleted container list needed since "reprocess" only has put operations - writeToTheDB(containerKeyMap, containerKeyCountMap, Collections.emptyList(), reconContainerMetadataManager); + writeToTheDB(localContainerKeyMap, sharedContainerKeyCountMap, Collections.emptyList(), + reconContainerMetadataManager); - // Clear in-memory maps after successful commit - containerKeyMap.clear(); - containerKeyCountMap.clear(); + // Only clear localContainerKeyMap (per-task), keep sharedContainerKeyCountMap for other tasks + localContainerKeyMap.clear(); } catch (IOException e) { LOG.error("Unable to write Container Key and Container Key Count data in Recon DB.", e); @@ -458,4 +528,18 @@ public static boolean flushAndCommitContainerKeyInfoToDB( return true; } + /** + * Clears the shared container count map and resets initialization flag. + * This method should be called by tests to ensure clean state between test runs. + */ + @VisibleForTesting + public static void clearSharedContainerCountMap() { + synchronized (INITIALIZATION_LOCK) { + SHARED_CONTAINER_KEY_COUNT_MAP.clear(); + ReconConstants.CONTAINER_KEY_MAPPER_INITIALIZED.set(false); + ACTIVE_TASK_COUNT.set(0); + LOG.debug("Cleared shared container count map, reset initialization flag, and reset task counter for tests"); + } + } + } diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperTaskFSO.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperTaskFSO.java index 60499ac33c8a..9b3374248bd3 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperTaskFSO.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperTaskFSO.java @@ -55,9 +55,19 @@ public TaskResult reprocess(OMMetadataManager omMetadataManager) { long containerKeyFlushToDBMaxThreshold = ozoneConfiguration.getLong( ReconServerConfigKeys.OZONE_RECON_CONTAINER_KEY_FLUSH_TO_DB_MAX_THRESHOLD, ReconServerConfigKeys.OZONE_RECON_CONTAINER_KEY_FLUSH_TO_DB_MAX_THRESHOLD_DEFAULT); + int maxKeysInMemory = ozoneConfiguration.getInt( + ReconServerConfigKeys.OZONE_RECON_TASK_REPROCESS_MAX_KEYS_IN_MEMORY, + ReconServerConfigKeys.OZONE_RECON_TASK_REPROCESS_MAX_KEYS_IN_MEMORY_DEFAULT); + int maxIterators = ozoneConfiguration.getInt( + ReconServerConfigKeys.OZONE_RECON_TASK_REPROCESS_MAX_ITERATORS, + ReconServerConfigKeys.OZONE_RECON_TASK_REPROCESS_MAX_ITERATORS_DEFAULT); + int maxWorkers = ozoneConfiguration.getInt( + ReconServerConfigKeys.OZONE_RECON_TASK_REPROCESS_MAX_WORKERS, + ReconServerConfigKeys.OZONE_RECON_TASK_REPROCESS_MAX_WORKERS_DEFAULT); boolean result = ContainerKeyMapperHelper.reprocess( omMetadataManager, reconContainerMetadataManager, - BucketLayout.FILE_SYSTEM_OPTIMIZED, getTaskName(), containerKeyFlushToDBMaxThreshold); + BucketLayout.FILE_SYSTEM_OPTIMIZED, getTaskName(), containerKeyFlushToDBMaxThreshold, + maxIterators, maxWorkers, maxKeysInMemory); return buildTaskResult(result); } diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperTaskOBS.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperTaskOBS.java index e69601040d60..47c19d15b9ed 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperTaskOBS.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperTaskOBS.java @@ -55,9 +55,18 @@ public TaskResult reprocess(OMMetadataManager omMetadataManager) { long containerKeyFlushToDBMaxThreshold = ozoneConfiguration.getLong( ReconServerConfigKeys.OZONE_RECON_CONTAINER_KEY_FLUSH_TO_DB_MAX_THRESHOLD, ReconServerConfigKeys.OZONE_RECON_CONTAINER_KEY_FLUSH_TO_DB_MAX_THRESHOLD_DEFAULT); + int maxKeysInMemory = ozoneConfiguration.getInt( + ReconServerConfigKeys.OZONE_RECON_TASK_REPROCESS_MAX_KEYS_IN_MEMORY, + ReconServerConfigKeys.OZONE_RECON_TASK_REPROCESS_MAX_KEYS_IN_MEMORY_DEFAULT); + int maxIterators = ozoneConfiguration.getInt( + ReconServerConfigKeys.OZONE_RECON_TASK_REPROCESS_MAX_ITERATORS, + ReconServerConfigKeys.OZONE_RECON_TASK_REPROCESS_MAX_ITERATORS_DEFAULT); + int maxWorkers = ozoneConfiguration.getInt( + ReconServerConfigKeys.OZONE_RECON_TASK_REPROCESS_MAX_WORKERS, + ReconServerConfigKeys.OZONE_RECON_TASK_REPROCESS_MAX_WORKERS_DEFAULT); boolean result = ContainerKeyMapperHelper.reprocess( omMetadataManager, reconContainerMetadataManager, BucketLayout.OBJECT_STORE, getTaskName(), - containerKeyFlushToDBMaxThreshold); + containerKeyFlushToDBMaxThreshold, maxIterators, maxWorkers, maxKeysInMemory); return buildTaskResult(result); } diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/DeletedKeysInsightHandler.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/DeletedKeysInsightHandler.java index f315ddf3e862..54e375ae0d6f 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/DeletedKeysInsightHandler.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/DeletedKeysInsightHandler.java @@ -23,6 +23,7 @@ import org.apache.commons.lang3.tuple.Triple; import org.apache.hadoop.hdds.utils.db.Table; import org.apache.hadoop.hdds.utils.db.TableIterator; +import org.apache.hadoop.ozone.om.OMMetadataManager; import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -112,19 +113,18 @@ public void handleUpdateEvent(OMDBUpdateEvent event, * pending deletion in Ozone. */ @Override - public Triple getTableSizeAndCount( - TableIterator> iterator) - throws IOException { + public Triple getTableSizeAndCount(String tableName, + OMMetadataManager omMetadataManager) throws IOException { long count = 0; long unReplicatedSize = 0; long replicatedSize = 0; - if (iterator != null) { + Table table = omMetadataManager.getDeletedTable(); + try (TableIterator> iterator = table.iterator()) { while (iterator.hasNext()) { - Table.KeyValue kv = iterator.next(); + Table.KeyValue kv = iterator.next(); if (kv != null && kv.getValue() != null) { - RepeatedOmKeyInfo repeatedOmKeyInfo = (RepeatedOmKeyInfo) kv - .getValue(); + RepeatedOmKeyInfo repeatedOmKeyInfo = kv.getValue(); Pair result = repeatedOmKeyInfo.getTotalSize(); unReplicatedSize += result.getRight(); replicatedSize += result.getLeft(); diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/FileSizeCountTaskFSO.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/FileSizeCountTaskFSO.java index 7a13fea4f989..229225de3385 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/FileSizeCountTaskFSO.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/FileSizeCountTaskFSO.java @@ -26,6 +26,7 @@ import org.apache.hadoop.hdds.utils.db.DBStore; import org.apache.hadoop.ozone.om.OMMetadataManager; import org.apache.hadoop.ozone.om.helpers.BucketLayout; +import org.apache.hadoop.ozone.recon.ReconServerConfigKeys; import org.apache.hadoop.ozone.recon.recovery.ReconOMMetadataManager; import org.apache.hadoop.ozone.recon.spi.ReconFileMetadataManager; @@ -53,11 +54,27 @@ public ReconOmTask getStagedTask(ReconOMMetadataManager stagedOmMetadataManager, @Override public TaskResult reprocess(OMMetadataManager omMetadataManager) { + int maxKeysInMemory = ozoneConfiguration.getInt( + ReconServerConfigKeys.OZONE_RECON_TASK_REPROCESS_MAX_KEYS_IN_MEMORY, + ReconServerConfigKeys.OZONE_RECON_TASK_REPROCESS_MAX_KEYS_IN_MEMORY_DEFAULT); + int maxIterators = ozoneConfiguration.getInt( + ReconServerConfigKeys.OZONE_RECON_TASK_REPROCESS_MAX_ITERATORS, + ReconServerConfigKeys.OZONE_RECON_TASK_REPROCESS_MAX_ITERATORS_DEFAULT); + int maxWorkers = ozoneConfiguration.getInt( + ReconServerConfigKeys.OZONE_RECON_TASK_REPROCESS_MAX_WORKERS, + ReconServerConfigKeys.OZONE_RECON_TASK_REPROCESS_MAX_WORKERS_DEFAULT); + long fileSizeCountFlushThreshold = ozoneConfiguration.getLong( + ReconServerConfigKeys.OZONE_RECON_FILESIZECOUNT_FLUSH_TO_DB_MAX_THRESHOLD, + ReconServerConfigKeys.OZONE_RECON_FILESIZECOUNT_FLUSH_TO_DB_MAX_THRESHOLD_DEFAULT); return FileSizeCountTaskHelper.reprocess( omMetadataManager, reconFileMetadataManager, BucketLayout.FILE_SYSTEM_OPTIMIZED, - getTaskName() + getTaskName(), + maxIterators, + maxWorkers, + maxKeysInMemory, + fileSizeCountFlushThreshold ); } diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/FileSizeCountTaskHelper.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/FileSizeCountTaskHelper.java index dfc579d8156a..b82fcd556a1b 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/FileSizeCountTaskHelper.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/FileSizeCountTaskHelper.java @@ -17,19 +17,21 @@ package org.apache.hadoop.ozone.recon.tasks; -import java.io.IOException; import java.util.HashMap; import java.util.Iterator; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Function; import org.apache.hadoop.hdds.utils.db.RDBBatchOperation; +import org.apache.hadoop.hdds.utils.db.StringCodec; import org.apache.hadoop.hdds.utils.db.Table; -import org.apache.hadoop.hdds.utils.db.TableIterator; import org.apache.hadoop.ozone.om.OMMetadataManager; import org.apache.hadoop.ozone.om.helpers.BucketLayout; import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; import org.apache.hadoop.ozone.recon.ReconConstants; import org.apache.hadoop.ozone.recon.ReconUtils; import org.apache.hadoop.ozone.recon.spi.ReconFileMetadataManager; +import org.apache.hadoop.ozone.recon.tasks.util.ParallelTableIteratorOperation; import org.apache.hadoop.util.Time; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,9 +50,8 @@ public abstract class FileSizeCountTaskHelper { */ public static void handlePutKeyEvent(OmKeyInfo omKeyInfo, Map fileSizeCountMap) { - FileSizeCountKey key = getFileSizeCountKey(omKeyInfo); - Long count = fileSizeCountMap.containsKey(key) ? fileSizeCountMap.get(key) + 1L : 1L; - fileSizeCountMap.put(key, count); + fileSizeCountMap.compute(getFileSizeCountKey(omKeyInfo), + (k, v) -> (v == null ? 0L : v) + 1L); } /** @@ -61,9 +62,8 @@ public static void handleDeleteKeyEvent(String key, OmKeyInfo omKeyInfo, if (omKeyInfo == null) { LOG.warn("Deleting a key not found while handling DELETE key event. Key not found in Recon OM DB: {}", key); } else { - FileSizeCountKey countKey = getFileSizeCountKey(omKeyInfo); - Long count = fileSizeCountMap.containsKey(countKey) ? fileSizeCountMap.get(countKey) - 1L : -1L; - fileSizeCountMap.put(countKey, count); + fileSizeCountMap.compute(getFileSizeCountKey(omKeyInfo), + (k, v) -> (v == null ? 0L : v) - 1L); } } @@ -102,63 +102,101 @@ public static void truncateFileCountTableIfNeeded(ReconFileMetadataManager recon /** * Executes the reprocess method using RocksDB for the given task. */ + @SuppressWarnings("checkstyle:ParameterNumber") public static ReconOmTask.TaskResult reprocess(OMMetadataManager omMetadataManager, ReconFileMetadataManager reconFileMetadataManager, BucketLayout bucketLayout, - String taskName) { - LOG.info("Starting RocksDB Reprocess for {}", taskName); - Map fileSizeCountMap = new HashMap<>(); - long startTime = Time.monotonicNow(); + String taskName, + int maxIterators, + int maxWorkers, + int maxKeysInMemory, + long fileSizeCountFlushThreshold) { + LOG.info("{}: Starting reprocess for bucket layout {}", taskName, bucketLayout); + Map fileSizeCountMap = new ConcurrentHashMap<>(); + long overallStartTime = Time.monotonicNow(); // Ensure the file count table is truncated only once during reprocess truncateFileCountTableIfNeeded(reconFileMetadataManager, taskName); boolean status = reprocessBucketLayout( - bucketLayout, omMetadataManager, fileSizeCountMap, reconFileMetadataManager, taskName); + bucketLayout, omMetadataManager, fileSizeCountMap, reconFileMetadataManager, taskName, + maxIterators, maxWorkers, maxKeysInMemory, fileSizeCountFlushThreshold); if (!status) { return buildTaskResult(taskName, false); } + // Write remaining counts to DB (no global lock needed - FSO and OBS are mutually exclusive) writeCountsToDB(fileSizeCountMap, reconFileMetadataManager); - - long endTime = Time.monotonicNow(); - LOG.info("{} completed RocksDB Reprocess in {} ms.", taskName, (endTime - startTime)); + + long totalDurationMs = Time.monotonicNow() - overallStartTime; + double durationSeconds = (double) totalDurationMs / 1000.0; + + LOG.info("{}: Reprocess completed in {} sec", taskName, durationSeconds); return buildTaskResult(taskName, true); } /** - * Iterates over the OM DB keys for the given bucket layout and updates the fileSizeCountMap (RocksDB version). + * Iterates over the OM DB keys for the given bucket layout using lockless per-worker maps. + * Each worker maintains its own map to eliminate read lock contention. */ + @SuppressWarnings("checkstyle:ParameterNumber") public static boolean reprocessBucketLayout(BucketLayout bucketLayout, OMMetadataManager omMetadataManager, Map fileSizeCountMap, ReconFileMetadataManager reconFileMetadataManager, - String taskName) { + String taskName, + int maxIterators, + int maxWorkers, + int maxKeysInMemory, + long fileSizeCountFlushThreshold) { Table omKeyInfoTable = omMetadataManager.getKeyTable(bucketLayout); - int totalKeysProcessed = 0; - - try (TableIterator> keyIter = - omKeyInfoTable.iterator()) { - while (keyIter.hasNext()) { - Table.KeyValue kv = keyIter.next(); - handlePutKeyEvent(kv.getValue(), fileSizeCountMap); - totalKeysProcessed++; - // Flush to RocksDB periodically. - if (fileSizeCountMap.size() >= 100000) { - // For reprocess, we don't need to check existing values since table was truncated - LOG.debug("Flushing {} accumulated counts to RocksDB for {}", fileSizeCountMap.size(), taskName); - writeCountsToDB(fileSizeCountMap, reconFileMetadataManager); - fileSizeCountMap.clear(); + // Divide threshold by worker count so each worker flushes independently + final long perWorkerThreshold = Math.max(1, fileSizeCountFlushThreshold / maxWorkers); + + // Map thread IDs to worker-specific maps for lockless updates + Map> allMap = new ConcurrentHashMap<>(); + + // Lock for coordinating DB flush operations only + Object flushLock = new Object(); + + // Lambda executed by workers for each key + Function, Void> kvOperation = kv -> { + // Get or create this worker's private map using thread ID + Map workerFileSizeCountMap = allMap.computeIfAbsent( + Thread.currentThread().getId(), k -> new HashMap<>()); + + // Update worker's private map without locks + handlePutKeyEvent(kv.getValue(), workerFileSizeCountMap); + + // Flush this worker's map when it reaches threshold + if (workerFileSizeCountMap.size() >= perWorkerThreshold) { + synchronized (flushLock) { + writeCountsToDB(workerFileSizeCountMap, reconFileMetadataManager); + workerFileSizeCountMap.clear(); } } - } catch (IOException ioEx) { - LOG.error("Unable to populate File Size Count for {} in RocksDB.", taskName, ioEx); + return null; + }; + + try (ParallelTableIteratorOperation keyIter = + new ParallelTableIteratorOperation<>(omMetadataManager, omKeyInfoTable, + StringCodec.get(), maxIterators, maxWorkers, maxKeysInMemory, perWorkerThreshold)) { + keyIter.performTaskOnTableVals(taskName, null, null, kvOperation); + } catch (Exception ex) { + LOG.error("Unable to populate File Size Count for {} in RocksDB.", taskName, ex); return false; } - LOG.info("Reprocessed {} keys for bucket layout {} using RocksDB.", totalKeysProcessed, bucketLayout); + // Final flush: Write remaining entries from all worker maps to DB + for (Map workerFileSizeCountMap : allMap.values()) { + if (!workerFileSizeCountMap.isEmpty()) { + writeCountsToDB(workerFileSizeCountMap, reconFileMetadataManager); + workerFileSizeCountMap.clear(); + } + } + return true; } @@ -216,6 +254,7 @@ public static ReconOmTask.TaskResult processEvents(OMUpdateEventBatch events, } } + // Write remaining counts to DB (no lock needed for incremental processing) writeCountsToDB(fileSizeCountMap, reconFileMetadataManager); LOG.debug("{} successfully processed using RocksDB in {} milliseconds", taskName, @@ -225,68 +264,34 @@ public static ReconOmTask.TaskResult processEvents(OMUpdateEventBatch events, /** * Writes the accumulated file size counts to RocksDB using ReconFileMetadataManager. + * + * Thread Safety: FSO and OBS tasks write to different bucket keys (mutually exclusive), + * so no global lock is needed. RocksDB handles concurrent writes to different keys safely. */ - /** - * Checks if the file count table is empty by trying to get the first entry. - * This mimics the SQL Derby behavior of isFileCountBySizeTableEmpty(). - */ - private static boolean isFileCountTableEmpty(ReconFileMetadataManager reconFileMetadataManager) { - try (TableIterator> iterator = - reconFileMetadataManager.getFileCountTable().iterator()) { - return !iterator.hasNext(); - } catch (Exception e) { - LOG.warn("Error checking if file count table is empty, assuming not empty", e); - return false; - } - } - public static void writeCountsToDB(Map fileSizeCountMap, ReconFileMetadataManager reconFileMetadataManager) { if (fileSizeCountMap.isEmpty()) { return; } - - boolean isTableEmpty = isFileCountTableEmpty(reconFileMetadataManager); - - LOG.debug("writeCountsToDB: processing {} entries, isTableEmpty={}", - fileSizeCountMap.size(), isTableEmpty); try (RDBBatchOperation rdbBatchOperation = new RDBBatchOperation()) { for (Map.Entry entry : fileSizeCountMap.entrySet()) { FileSizeCountKey key = entry.getKey(); Long deltaCount = entry.getValue(); - - LOG.debug("Processing key: {}, deltaCount: {}", key, deltaCount); - - if (isTableEmpty) { - // Direct insert when table is empty (like SQL Derby reprocess behavior) - LOG.debug("Direct insert (table empty): key={}, deltaCount={}", key, deltaCount); - if (deltaCount > 0L) { - reconFileMetadataManager.batchStoreFileSizeCount(rdbBatchOperation, key, deltaCount); - LOG.debug("Storing key={} with deltaCount={}", key, deltaCount); - } - } else { - // Incremental update when table has data (like SQL Derby incremental behavior) - Long existingCount = reconFileMetadataManager.getFileSizeCount(key); - Long newCount = (existingCount != null ? existingCount : 0L) + deltaCount; - - LOG.debug("Incremental update: key={}, existingCount={}, deltaCount={}, newCount={}", - key, existingCount, deltaCount, newCount); - - if (newCount > 0L) { - reconFileMetadataManager.batchStoreFileSizeCount(rdbBatchOperation, key, newCount); - LOG.debug("Storing key={} with newCount={}", key, newCount); - } else if (existingCount != null) { - // Delete key if count becomes 0 or negative - reconFileMetadataManager.batchDeleteFileSizeCount(rdbBatchOperation, key); - LOG.debug("Deleting key={} as newCount={} <= 0", key, newCount); - } + + // Read-modify-write: Read current count, add delta, write back + Long existingCount = reconFileMetadataManager.getFileSizeCount(key); + Long newCount = (existingCount != null ? existingCount : 0L) + deltaCount; + + if (newCount > 0L) { + reconFileMetadataManager.batchStoreFileSizeCount(rdbBatchOperation, key, newCount); + } else if (existingCount != null) { + // Delete key if count becomes 0 or negative + reconFileMetadataManager.batchDeleteFileSizeCount(rdbBatchOperation, key); } } - - LOG.debug("Committing batch operation with {} operations", fileSizeCountMap.size()); + reconFileMetadataManager.commitBatchOperation(rdbBatchOperation); - LOG.debug("Batch operation committed successfully"); } catch (Exception e) { LOG.error("Error writing file size counts to RocksDB", e); throw new RuntimeException("Failed to write to RocksDB", e); diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/FileSizeCountTaskOBS.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/FileSizeCountTaskOBS.java index fc192eca0422..118edb6a1125 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/FileSizeCountTaskOBS.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/FileSizeCountTaskOBS.java @@ -26,6 +26,7 @@ import org.apache.hadoop.hdds.utils.db.DBStore; import org.apache.hadoop.ozone.om.OMMetadataManager; import org.apache.hadoop.ozone.om.helpers.BucketLayout; +import org.apache.hadoop.ozone.recon.ReconServerConfigKeys; import org.apache.hadoop.ozone.recon.recovery.ReconOMMetadataManager; import org.apache.hadoop.ozone.recon.spi.ReconFileMetadataManager; @@ -53,11 +54,27 @@ public ReconOmTask getStagedTask(ReconOMMetadataManager stagedOmMetadataManager, @Override public TaskResult reprocess(OMMetadataManager omMetadataManager) { + int maxKeysInMemory = ozoneConfiguration.getInt( + ReconServerConfigKeys.OZONE_RECON_TASK_REPROCESS_MAX_KEYS_IN_MEMORY, + ReconServerConfigKeys.OZONE_RECON_TASK_REPROCESS_MAX_KEYS_IN_MEMORY_DEFAULT); + int maxIterators = ozoneConfiguration.getInt( + ReconServerConfigKeys.OZONE_RECON_TASK_REPROCESS_MAX_ITERATORS, + ReconServerConfigKeys.OZONE_RECON_TASK_REPROCESS_MAX_ITERATORS_DEFAULT); + int maxWorkers = ozoneConfiguration.getInt( + ReconServerConfigKeys.OZONE_RECON_TASK_REPROCESS_MAX_WORKERS, + ReconServerConfigKeys.OZONE_RECON_TASK_REPROCESS_MAX_WORKERS_DEFAULT); + long fileSizeCountFlushThreshold = ozoneConfiguration.getLong( + ReconServerConfigKeys.OZONE_RECON_FILESIZECOUNT_FLUSH_TO_DB_MAX_THRESHOLD, + ReconServerConfigKeys.OZONE_RECON_FILESIZECOUNT_FLUSH_TO_DB_MAX_THRESHOLD_DEFAULT); return FileSizeCountTaskHelper.reprocess( omMetadataManager, reconFileMetadataManager, BucketLayout.OBJECT_STORE, - getTaskName() + getTaskName(), + maxIterators, + maxWorkers, + maxKeysInMemory, + fileSizeCountFlushThreshold ); } diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/MultipartInfoInsightHandler.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/MultipartInfoInsightHandler.java index 3976919ab4b1..828192ec1277 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/MultipartInfoInsightHandler.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/MultipartInfoInsightHandler.java @@ -17,10 +17,12 @@ package org.apache.hadoop.ozone.recon.tasks; +import java.io.IOException; import java.util.Map; import org.apache.commons.lang3.tuple.Triple; import org.apache.hadoop.hdds.utils.db.Table; import org.apache.hadoop.hdds.utils.db.TableIterator; +import org.apache.hadoop.ozone.om.OMMetadataManager; import org.apache.hadoop.ozone.om.helpers.OmMultipartKeyInfo; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.PartKeyInfo; import org.apache.hadoop.ozone.recon.api.types.ReconBasicOmKeyInfo; @@ -148,17 +150,19 @@ public void handleUpdateEvent(OMDBUpdateEvent event, String tabl * uploads in the backend. */ @Override - public Triple getTableSizeAndCount( - TableIterator> iterator) { + public Triple getTableSizeAndCount(String tableName, + OMMetadataManager omMetadataManager) throws IOException { long count = 0; long unReplicatedSize = 0; long replicatedSize = 0; - if (iterator != null) { + Table table = + (Table) omMetadataManager.getTable(tableName); + try (TableIterator> iterator = table.iterator()) { while (iterator.hasNext()) { - Table.KeyValue kv = iterator.next(); + Table.KeyValue kv = iterator.next(); if (kv != null && kv.getValue() != null) { - OmMultipartKeyInfo multipartKeyInfo = (OmMultipartKeyInfo) kv.getValue(); + OmMultipartKeyInfo multipartKeyInfo = kv.getValue(); for (PartKeyInfo partKeyInfo : multipartKeyInfo.getPartKeyInfoMap()) { ReconBasicOmKeyInfo omKeyInfo = ReconBasicOmKeyInfo.getFromProtobuf(partKeyInfo.getPartKeyInfo()); unReplicatedSize += omKeyInfo.getDataSize(); diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/OmTableHandler.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/OmTableHandler.java index 401a3c7dc099..7e6fa5b9fc89 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/OmTableHandler.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/OmTableHandler.java @@ -20,8 +20,7 @@ import java.io.IOException; import java.util.Map; import org.apache.commons.lang3.tuple.Triple; -import org.apache.hadoop.hdds.utils.db.Table; -import org.apache.hadoop.hdds.utils.db.TableIterator; +import org.apache.hadoop.ozone.om.OMMetadataManager; /** * Interface for handling PUT, DELETE and UPDATE events for size-related @@ -82,19 +81,18 @@ void handleUpdateEvent(OMDBUpdateEvent event, /** * Returns a triple with the total count of records (left), total unreplicated - * size (middle), and total replicated size (right) in the given iterator. + * size (middle), and total replicated size (right) for the given table. * Increments count for each record and adds the dataSize if a record's value * is an instance of OmKeyInfo,RepeatedOmKeyInfo. - * If the iterator is null, returns (0, 0, 0). * - * @param iterator The iterator over the table to be iterated. + * @param tableName The name of the table to process. + * @param omMetadataManager The OM metadata manager to get the table. * @return A Triple with three Long values representing the count, * unReplicated size and replicated size. * @throws IOException If an I/O error occurs during the iterator traversal. */ - Triple getTableSizeAndCount( - TableIterator> iterator) - throws IOException; + Triple getTableSizeAndCount(String tableName, + OMMetadataManager omMetadataManager) throws IOException; /** * Returns the count key for the given table. diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/OmTableInsightTask.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/OmTableInsightTask.java index 341912b5d2e0..8027966231de 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/OmTableInsightTask.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/OmTableInsightTask.java @@ -32,14 +32,20 @@ import java.util.Iterator; import java.util.Map; import java.util.Map.Entry; +import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.lang3.tuple.Triple; +import org.apache.hadoop.hdds.utils.db.ByteArrayCodec; import org.apache.hadoop.hdds.utils.db.DBStore; import org.apache.hadoop.hdds.utils.db.RDBBatchOperation; +import org.apache.hadoop.hdds.utils.db.StringCodec; import org.apache.hadoop.hdds.utils.db.Table; import org.apache.hadoop.hdds.utils.db.TableIterator; +import org.apache.hadoop.hdds.utils.db.cache.TableCache; import org.apache.hadoop.ozone.om.OMMetadataManager; +import org.apache.hadoop.ozone.recon.ReconServerConfigKeys; import org.apache.hadoop.ozone.recon.recovery.ReconOMMetadataManager; import org.apache.hadoop.ozone.recon.spi.ReconGlobalStatsManager; +import org.apache.hadoop.ozone.recon.tasks.util.ParallelTableIteratorOperation; import org.apache.hadoop.util.Time; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -59,6 +65,8 @@ public class OmTableInsightTask implements ReconOmTask { private Map objectCountMap; private Map unReplicatedSizeMap; private Map replicatedSizeMap; + private final int maxKeysInMemory; + private final int maxIterators; @Inject public OmTableInsightTask(ReconGlobalStatsManager reconGlobalStatsManager, @@ -72,6 +80,12 @@ public OmTableInsightTask(ReconGlobalStatsManager reconGlobalStatsManager, tableHandlers.put(OPEN_FILE_TABLE, new OpenKeysInsightHandler()); tableHandlers.put(DELETED_TABLE, new DeletedKeysInsightHandler()); tableHandlers.put(MULTIPART_INFO_TABLE, new MultipartInfoInsightHandler()); + this.maxKeysInMemory = reconOMMetadataManager.getOzoneConfiguration().getInt( + ReconServerConfigKeys.OZONE_RECON_TASK_REPROCESS_MAX_KEYS_IN_MEMORY, + ReconServerConfigKeys.OZONE_RECON_TASK_REPROCESS_MAX_KEYS_IN_MEMORY_DEFAULT); + this.maxIterators = reconOMMetadataManager.getOzoneConfiguration().getInt( + ReconServerConfigKeys.OZONE_RECON_TASK_REPROCESS_MAX_ITERATORS, + ReconServerConfigKeys.OZONE_RECON_TASK_REPROCESS_MAX_ITERATORS_DEFAULT); } @Override @@ -97,39 +111,36 @@ public void init() { } /** - * Iterates the rows of each table in the OM snapshot DB and calculates the - * counts and sizes for table data. - *

- * For tables that require data size calculation - * (as returned by getTablesToCalculateSize), both the number of - * records (count) and total data size of the records are calculated. - * For all other tables, only the count of records is calculated. + * Reprocess all OM tables to calculate counts and sizes. + * Handler tables (with size calculation) use sequential iteration. + * Simple tables (count only) use parallel iteration with String keys, + * or sequential for non-String key tables. * - * @param omMetadataManager OM Metadata instance. - * @return Pair + * @param omMetadataManager OM Metadata instance + * @return TaskResult indicating success or failure */ @Override public TaskResult reprocess(OMMetadataManager omMetadataManager) { + LOG.info("{}: Starting reprocess", getTaskName()); + long startTime = Time.monotonicNow(); + init(); for (String tableName : tables) { - Table table = omMetadataManager.getTable(tableName); - - try (TableIterator> iterator - = table.iterator()) { + try { if (tableHandlers.containsKey(tableName)) { - Triple details = - tableHandlers.get(tableName).getTableSizeAndCount(iterator); - objectCountMap.put(getTableCountKeyFromTable(tableName), - details.getLeft()); - unReplicatedSizeMap.put( - getUnReplicatedSizeKeyFromTable(tableName), details.getMiddle()); - replicatedSizeMap.put(getReplicatedSizeKeyFromTable(tableName), - details.getRight()); + Triple details = + tableHandlers.get(tableName).getTableSizeAndCount(tableName, omMetadataManager); + objectCountMap.put(getTableCountKeyFromTable(tableName), details.getLeft()); + unReplicatedSizeMap.put(getUnReplicatedSizeKeyFromTable(tableName), details.getMiddle()); + replicatedSizeMap.put(getReplicatedSizeKeyFromTable(tableName), details.getRight()); } else { - long count = Iterators.size(iterator); - objectCountMap.put(getTableCountKeyFromTable(tableName), count); + if (usesNonStringKeys(tableName)) { + processTableSequentially(tableName, omMetadataManager); + } else { + processTableInParallel(tableName, omMetadataManager); + } } - } catch (IOException ioEx) { + } catch (Exception ioEx) { LOG.error("Unable to populate Table Count in Recon DB.", ioEx); return buildTaskResult(false); } @@ -144,11 +155,79 @@ public TaskResult reprocess(OMMetadataManager omMetadataManager) { if (!replicatedSizeMap.isEmpty()) { writeDataToDB(replicatedSizeMap); } + long endTime = Time.monotonicNow(); + long durationMs = endTime - startTime; - LOG.debug("Completed a 'reprocess' run of OmTableInsightTask."); + LOG.info("{}: Reprocess completed in {} ms", getTaskName(), durationMs); return buildTaskResult(true); } + /** + * Check if table uses non-String keys (e.g., OzoneTokenIdentifier). + * These tables cannot use StringCodec and must be processed sequentially. + */ + private boolean usesNonStringKeys(String tableName) { + return tableName.equals("dTokenTable") || tableName.equals("s3SecretTable"); + } + + /** + * Process table sequentially using key-only iterator. + * Used for tables with non-String keys or as fallback. + */ + private void processTableSequentially(String tableName, OMMetadataManager omMetadataManager) throws IOException { + LOG.info("{}: Processing table {} sequentially (non-String keys)", getTaskName(), tableName); + + Table table = omMetadataManager.getStore() + .getTable(tableName, ByteArrayCodec.get(), ByteArrayCodec.get(), TableCache.CacheType.NO_CACHE); + try (TableIterator keyIterator = table.keyIterator()) { + long count = Iterators.size(keyIterator); + objectCountMap.put(getTableCountKeyFromTable(tableName), count); + } + } + + /** + * Process table in parallel using multiple iterators and workers. + * Only for tables with String keys. + */ + private void processTableInParallel(String tableName, OMMetadataManager omMetadataManager) throws Exception { + int workerCount = 2; // Only 2 workers needed for simple counting + + Table table = omMetadataManager.getStore() + .getTable(tableName, StringCodec.get(), ByteArrayCodec.get(), TableCache.CacheType.NO_CACHE); + + long estimatedCount = 100000; // Default + try { + estimatedCount = table.getEstimatedKeyCount(); + } catch (IOException e) { + LOG.info("Could not estimate key count for table {}, using default", tableName); + } + long loggingThreshold = calculateLoggingThreshold(estimatedCount); + + AtomicLong count = new AtomicLong(0); + + try (ParallelTableIteratorOperation parallelIter = new ParallelTableIteratorOperation<>( + omMetadataManager, table, StringCodec.get(), + maxIterators, workerCount, maxKeysInMemory, loggingThreshold)) { + + parallelIter.performTaskOnTableVals(getTaskName(), null, null, kv -> { + if (kv != null) { + count.incrementAndGet(); + } + return null; + }); + } + + objectCountMap.put(getTableCountKeyFromTable(tableName), count.get()); + } + + /** + * Calculate logging threshold based on estimated key count. + * Logs progress every 1% of total keys, minimum 1. + */ + private long calculateLoggingThreshold(long estimatedCount) { + return Math.max(estimatedCount / 100, 1); + } + @Override public String getTaskName() { return "OmTableInsightTask"; diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/OpenKeysInsightHandler.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/OpenKeysInsightHandler.java index d22963ed2807..b78e8cb1518f 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/OpenKeysInsightHandler.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/OpenKeysInsightHandler.java @@ -22,6 +22,7 @@ import org.apache.commons.lang3.tuple.Triple; import org.apache.hadoop.hdds.utils.db.Table; import org.apache.hadoop.hdds.utils.db.TableIterator; +import org.apache.hadoop.ozone.om.OMMetadataManager; import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -127,18 +128,18 @@ public void handleUpdateEvent(OMDBUpdateEvent event, * that are currently open in the backend. */ @Override - public Triple getTableSizeAndCount( - TableIterator> iterator) - throws IOException { + public Triple getTableSizeAndCount(String tableName, + OMMetadataManager omMetadataManager) throws IOException { long count = 0; long unReplicatedSize = 0; long replicatedSize = 0; - if (iterator != null) { + Table table = (Table) omMetadataManager.getTable(tableName); + try (TableIterator> iterator = table.iterator()) { while (iterator.hasNext()) { - Table.KeyValue kv = iterator.next(); + Table.KeyValue kv = iterator.next(); if (kv != null && kv.getValue() != null) { - OmKeyInfo omKeyInfo = (OmKeyInfo) kv.getValue(); + OmKeyInfo omKeyInfo = kv.getValue(); unReplicatedSize += omKeyInfo.getDataSize(); replicatedSize += omKeyInfo.getReplicatedSize(); count++; diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconTaskControllerImpl.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconTaskControllerImpl.java index 69f866239878..971614325829 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconTaskControllerImpl.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconTaskControllerImpl.java @@ -218,6 +218,7 @@ public synchronized boolean reInitializeTasks(ReconOMMetadataManager omMetadataM }); AtomicBoolean isRunSuccessful = new AtomicBoolean(true); + LOG.info("Submitting {} tasks for parallel reprocessing", tasks.size()); try { CompletableFuture.allOf(tasks.stream() .map(task -> { @@ -225,8 +226,11 @@ public synchronized boolean reInitializeTasks(ReconOMMetadataManager omMetadataM long reprocessStartTime = Time.monotonicNow(); return CompletableFuture.supplyAsync(() -> { + LOG.info("Task {} started execution on thread {}", + task.getTaskName(), Thread.currentThread().getName()); try { ReconOmTask.TaskResult result = task.call(); + LOG.info("Task {} completed execution", task.getTaskName()); return result; } catch (Exception e) { // Track reprocess failure per task diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/util/ParallelTableIteratorOperation.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/util/ParallelTableIteratorOperation.java new file mode 100644 index 000000000000..c91a3d97775b --- /dev/null +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/util/ParallelTableIteratorOperation.java @@ -0,0 +1,296 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.recon.tasks.util; + +import java.io.Closeable; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Objects; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Function; +import java.util.stream.Collectors; +import org.apache.hadoop.hdds.utils.db.Codec; +import org.apache.hadoop.hdds.utils.db.RDBStore; +import org.apache.hadoop.hdds.utils.db.Table; +import org.apache.hadoop.hdds.utils.db.TableIterator; +import org.apache.hadoop.ozone.om.OMMetadataManager; +import org.rocksdb.LiveFileMetaData; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Class to iterate through a table in parallel by breaking table into multiple iterators. + */ +public class ParallelTableIteratorOperation, V> implements Closeable { + private final Table table; + private final Codec keyCodec; + + // Thread Pools + private final ExecutorService iteratorExecutor; // 5 + private final ExecutorService valueExecutors; // 20 + + private final int maxNumberOfVals; + private final OMMetadataManager metadataManager; + private final int maxIteratorTasks; + private final int maxWorkerTasks; + private final long logCountThreshold; + + private static final Logger LOG = LoggerFactory.getLogger(ParallelTableIteratorOperation.class); + + public ParallelTableIteratorOperation(OMMetadataManager metadataManager, Table table, Codec keyCodec, + int iteratorCount, int workerCount, int maxNumberOfValsInMemory, + long logThreshold) { + this.table = table; + this.keyCodec = keyCodec; + this.metadataManager = metadataManager; + this.maxIteratorTasks = 2 * iteratorCount; // Allow up to 10 pending iterator tasks + this.maxWorkerTasks = workerCount * 2; // Allow up to 40 pending worker tasks + + // Create team of 5 iterator threads with UNLIMITED queue + // LinkedBlockingQueue() with no size = can hold infinite pending tasks + this.iteratorExecutor = new ThreadPoolExecutor(iteratorCount, iteratorCount, 1, TimeUnit.MINUTES, + new LinkedBlockingQueue<>()); + + // Create team of 20 worker threads with UNLIMITED queue + this.valueExecutors = new ThreadPoolExecutor(workerCount, workerCount, 1, TimeUnit.MINUTES, + new LinkedBlockingQueue<>()); + + // Calculate batch size per worker (e.g., 2000 / 20 = 100 keys per batch per worker) + this.maxNumberOfVals = Math.max(10, maxNumberOfValsInMemory / (workerCount)); + this.logCountThreshold = logThreshold; + } + + private List getBounds(K startKey, K endKey) throws IOException { + Set keys = new HashSet<>(); + + // Try to get SST file boundaries for optimal segmentation + // In test/mock environments, this may not be available + try { + RDBStore store = (RDBStore) this.metadataManager.getStore(); + if (store != null && store.getDb() != null) { + List sstFiles = store.getDb().getSstFileList(); + String tableName = table.getName(); + + // Only filter by column family if table name is available + if (tableName != null && !tableName.isEmpty()) { + byte[] tableNameBytes = tableName.getBytes(StandardCharsets.UTF_8); + for (LiveFileMetaData sstFile : sstFiles) { + // Filter SST files by column family to get bounds only for this specific table + if (Arrays.equals(sstFile.columnFamilyName(), tableNameBytes)) { + keys.add(this.keyCodec.fromPersistedFormat(sstFile.smallestKey())); + keys.add(this.keyCodec.fromPersistedFormat(sstFile.largestKey())); + } + } + } + } + } catch (Exception e) { + // If we can't get SST files (test environment, permissions, etc.), + // just use empty bounds and rely on fallback path + LOG.debug("Unable to retrieve SST file boundaries, will use fallback iteration: {}", e.getMessage()); + } + + if (startKey != null) { + keys.add(startKey); + } + if (endKey != null) { + keys.add(endKey); + } + + return keys.stream().sorted().filter(Objects::nonNull) + .filter(key -> startKey == null || key.compareTo(startKey) >= 0) + .filter(key -> endKey == null || endKey.compareTo(key) >= 0) + .collect(Collectors.toList()); + } + + private void waitForQueueSize(Queue> futures, int expectedSize) + throws ExecutionException, InterruptedException { + while (!futures.isEmpty() && futures.size() > expectedSize) { + Future f = futures.poll(); + f.get(); + } + } + + // Main parallelization logic + public void performTaskOnTableVals(String taskName, K startKey, K endKey, + Function, Void> keyOperation) throws IOException, ExecutionException, InterruptedException { + List bounds = getBounds(startKey, endKey); + + // Fallback for small tables (no SST files yet - data only in memtable) + if (bounds.size() < 2) { + try (TableIterator> iter = table.iterator()) { + if (startKey != null) { + iter.seek(startKey); + } + while (iter.hasNext()) { + Table.KeyValue kv = iter.next(); + if (endKey != null && kv.getKey().compareTo(endKey) > 0) { + break; + } + keyOperation.apply(kv); + } + } + return; + } + + // ===== PARALLEL PROCESSING SETUP ===== + + // Queue to track iterator threads (5 threads creating work) + Queue> iterFutures = new LinkedList<>(); + + // Queue to track worker threads (20 threads doing work) + Queue> workerFutures = new ConcurrentLinkedQueue<>(); + + AtomicLong keyCounter = new AtomicLong(); + AtomicLong prevLogCounter = new AtomicLong(); + Object logLock = new Object(); + + // ===== STEP 2: START ITERATOR THREADS ===== + // For each segment boundary, create an iterator thread + // Example: If bounds = [0, 5M, 10M, 15M, 20M], this loop runs 4 times: + // idx=1: beg=0, end=5M + // idx=2: beg=5M, end=10M + // idx=3: beg=10M, end=15M + // idx=4: beg=15M, end=20M + for (int idx = 1; idx < bounds.size(); idx++) { + K beg = bounds.get(idx - 1); + K end = bounds.get(idx); + boolean inclusive = idx == bounds.size() - 1; + waitForQueueSize(iterFutures, maxIteratorTasks - 1); + + // ===== STEP 3: SUBMIT ITERATOR TASK ===== + iterFutures.add(iteratorExecutor.submit(() -> { + try (TableIterator> iter = table.iterator()) { + iter.seek(beg); + while (iter.hasNext()) { + List> keyValues = new ArrayList<>(); + boolean reachedEnd = false; + while (iter.hasNext()) { + Table.KeyValue kv = iter.next(); + K key = kv.getKey(); + + // Check if key is within this segment's range + boolean withinBounds; + if (inclusive) { + // Last segment: include everything from beg onwards (or until endKey if specified) + withinBounds = (endKey == null || key.compareTo(endKey) <= 0); + } else { + // Middle segment: include keys in range [beg, end) + withinBounds = key.compareTo(end) < 0; + } + + if (withinBounds) { + keyValues.add(kv); + } else { + reachedEnd = true; + break; + } + + // If batch is full (2000 keys), stop collecting + if (keyValues.size() >= maxNumberOfVals) { + break; + } + } + + // ===== STEP 5: HAND BATCH TO WORKER THREAD ===== + if (!keyValues.isEmpty()) { + // WAIT if worker queue is too full (max 39 pending tasks) + waitForQueueSize(workerFutures, maxWorkerTasks - 1); + + // Submit batch to worker thread pool + workerFutures.add(valueExecutors.submit(() -> { + for (Table.KeyValue kv : keyValues) { + keyOperation.apply(kv); + } + keyCounter.addAndGet(keyValues.size()); + if (keyCounter.get() - prevLogCounter.get() > logCountThreshold) { + synchronized (logLock) { + if (keyCounter.get() - prevLogCounter.get() > logCountThreshold) { + long cnt = keyCounter.get(); + LOG.debug("Iterated through {} keys while performing task: {}", keyCounter.get(), taskName); + prevLogCounter.set(cnt); + } + } + } + // Worker task done! Future is now complete. + })); + } + // If we reached the end of our segment, stop reading + if (reachedEnd) { + break; + } + } + } catch (IOException e) { + LOG.error("IO error during parallel iteration on table {}", taskName, e); + throw new RuntimeException("IO error during iteration", e); + } catch (InterruptedException e) { + LOG.warn("Parallel iteration interrupted for task {}", taskName, e); + Thread.currentThread().interrupt(); + throw new RuntimeException("Iteration interrupted", e); + } catch (ExecutionException e) { + Throwable cause = e.getCause(); + LOG.error("Task execution failed for {}: {}", taskName, cause.getMessage(), cause); + throw new RuntimeException("Task execution failed", cause); + } + })); + } + + // ===== STEP 7: WAIT FOR EVERYONE TO FINISH ===== + // Wait for all 5 iterator threads to finish reading + waitForQueueSize(iterFutures, 0); + // Wait for all 20 worker threads to finish processing + waitForQueueSize(workerFutures, 0); + + // Log final stats + LOG.info("{}: Parallel iteration completed - Total keys processed: {}", taskName, keyCounter.get()); + } + + @Override + public void close() throws IOException { + iteratorExecutor.shutdown(); + valueExecutors.shutdown(); + try { + if (!iteratorExecutor.awaitTermination(60, TimeUnit.SECONDS)) { + iteratorExecutor.shutdownNow(); + } + if (!valueExecutors.awaitTermination(60, TimeUnit.SECONDS)) { + valueExecutors.shutdownNow(); + } + } catch (InterruptedException e) { + iteratorExecutor.shutdownNow(); + valueExecutors.shutdownNow(); + Thread.currentThread().interrupt(); + } + } +} + + + diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/util/package-info.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/util/package-info.java new file mode 100644 index 000000000000..f8ec57de2f2f --- /dev/null +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/util/package-info.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Package to define utility classes for tasks. + */ +package org.apache.hadoop.ozone.recon.tasks.util; + + + + + diff --git a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestContainerEndpoint.java b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestContainerEndpoint.java index 4794ecf1f309..7df56a57be65 100644 --- a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestContainerEndpoint.java +++ b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestContainerEndpoint.java @@ -47,6 +47,10 @@ import java.util.Map; import java.util.Set; import java.util.UUID; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; import javax.ws.rs.WebApplicationException; @@ -77,6 +81,7 @@ import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup; import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs; +import org.apache.hadoop.ozone.recon.ReconConstants; import org.apache.hadoop.ozone.recon.ReconTestInjector; import org.apache.hadoop.ozone.recon.api.types.ContainerDiscrepancyInfo; import org.apache.hadoop.ozone.recon.api.types.ContainerKeyPrefix; @@ -100,9 +105,11 @@ import org.apache.hadoop.ozone.recon.spi.StorageContainerServiceProvider; import org.apache.hadoop.ozone.recon.spi.impl.OzoneManagerServiceProviderImpl; import org.apache.hadoop.ozone.recon.spi.impl.StorageContainerServiceProviderImpl; +import org.apache.hadoop.ozone.recon.tasks.ContainerKeyMapperHelper; import org.apache.hadoop.ozone.recon.tasks.ContainerKeyMapperTaskFSO; import org.apache.hadoop.ozone.recon.tasks.ContainerKeyMapperTaskOBS; import org.apache.hadoop.ozone.recon.tasks.NSSummaryTaskWithFSO; +import org.apache.hadoop.ozone.recon.tasks.ReconOmTask; import org.apache.ozone.recon.schema.ContainerSchemaDefinition.UnHealthyContainerStates; import org.apache.ozone.recon.schema.generated.tables.pojos.UnhealthyContainers; import org.junit.jupiter.api.BeforeEach; @@ -216,7 +223,15 @@ public void setUp() throws Exception { if (!isSetupDone) { initializeInjector(); isSetupDone = true; + } else { + // Clear shared state before subsequent tests to prevent data leakage + ContainerKeyMapperHelper.clearSharedContainerCountMap(); + ReconConstants.resetTableTruncatedFlags(); + + // Reinitialize container tables to clear RocksDB data + reconContainerMetadataManager.reinitWithNewContainerDataFromOm(Collections.emptyMap()); } + omConfiguration = new OzoneConfiguration(); List omKeyLocationInfoList = new ArrayList<>(); @@ -297,14 +312,27 @@ public void setUp() throws Exception { reprocessContainerKeyMapper(); } - private void reprocessContainerKeyMapper() { + private void reprocessContainerKeyMapper() throws Exception { ContainerKeyMapperTaskOBS containerKeyMapperTaskOBS = new ContainerKeyMapperTaskOBS(reconContainerMetadataManager, omConfiguration); - containerKeyMapperTaskOBS.reprocess(reconOMMetadataManager); - ContainerKeyMapperTaskFSO containerKeyMapperTaskFSO = new ContainerKeyMapperTaskFSO(reconContainerMetadataManager, omConfiguration); - containerKeyMapperTaskFSO.reprocess(reconOMMetadataManager); + + // Run both tasks in parallel (like production) + ExecutorService executor = Executors.newFixedThreadPool(2); + try { + Future obsFuture = executor.submit( + () -> containerKeyMapperTaskOBS.reprocess(reconOMMetadataManager)); + Future fsoFuture = executor.submit( + () -> containerKeyMapperTaskFSO.reprocess(reconOMMetadataManager)); + + // Wait for both to complete + obsFuture.get(); + fsoFuture.get(); + } finally { + executor.shutdown(); + executor.awaitTermination(10, TimeUnit.SECONDS); + } } private void setUpFSOData() throws IOException { @@ -435,7 +463,7 @@ private OmKeyLocationInfoGroup getLocationInfoGroup1() { } @Test - public void testGetKeysForContainer() throws IOException { + public void testGetKeysForContainer() throws Exception { Response response = containerEndpoint.getKeysForContainer(1L, -1, ""); KeysResponse data = (KeysResponse) response.getEntity(); @@ -513,7 +541,7 @@ public void testGetKeysForContainer() throws IOException { } @Test - public void testGetKeysForContainerWithPrevKey() throws IOException { + public void testGetKeysForContainerWithPrevKey() throws Exception { // test if prev-key param works as expected Response response = containerEndpoint.getKeysForContainer( 1L, -1, "/sampleVol/bucketOne/key_one"); @@ -1370,7 +1398,7 @@ public void testGetContainerInsightsNonSCMContainers() @Test public void testGetContainerInsightsNonSCMContainersWithPrevKey() - throws IOException, TimeoutException { + throws Exception { // Add 3 more containers to OM making total container in OM to 5 String[] keys = {"key_three", "key_four", "key_five"}; @@ -1821,7 +1849,7 @@ private void setUpDuplicateFSOFileKeys() throws IOException { * and then verifies that the ContainerEndpoint returns two distinct key records. */ @Test - public void testDuplicateFSOKeysForContainerEndpoint() throws IOException { + public void testDuplicateFSOKeysForContainerEndpoint() throws Exception { // Set up duplicate FSO file keys. setUpDuplicateFSOFileKeys(); NSSummaryTaskWithFSO nSSummaryTaskWithFso = diff --git a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestContainerKeyMapperTask.java b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestContainerKeyMapperTask.java index 653de02cdbc2..ce8ab5ce2162 100644 --- a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestContainerKeyMapperTask.java +++ b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestContainerKeyMapperTask.java @@ -44,6 +44,7 @@ import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup; +import org.apache.hadoop.ozone.recon.ReconConstants; import org.apache.hadoop.ozone.recon.ReconTestInjector; import org.apache.hadoop.ozone.recon.api.types.ContainerKeyPrefix; import org.apache.hadoop.ozone.recon.recovery.ReconOMMetadataManager; @@ -95,6 +96,10 @@ public void setUp() throws Exception { .build(); reconContainerMetadataManager = reconTestInjector.getInstance(ReconContainerMetadataManager.class); + + // Clear shared container count map and reset flags for clean test state + ContainerKeyMapperHelper.clearSharedContainerCountMap(); + ReconConstants.resetTableTruncatedFlags(); } @Test diff --git a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestFileSizeCountTask.java b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestFileSizeCountTask.java index ca4a0399be80..c4b1041f76af 100644 --- a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestFileSizeCountTask.java +++ b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestFileSizeCountTask.java @@ -134,6 +134,7 @@ public void testReprocess() throws IOException { // Note: Even though legacy and OBS share the same underlying table, we simulate OBS here. when(omMetadataManager.getKeyTable(eq(BucketLayout.OBJECT_STORE))) .thenReturn(keyTableOBS); + when(keyTableOBS.getName()).thenReturn("keyTable"); // Mock table name for parallelization TypedTable.TypedTableIterator mockIterOBS = mock(TypedTable.TypedTableIterator.class); when(keyTableOBS.iterator()).thenReturn(mockIterOBS); // Simulate three keys then end. @@ -146,6 +147,7 @@ public void testReprocess() throws IOException { TypedTable keyTableFSO = mock(TypedTable.class); when(omMetadataManager.getKeyTable(eq(BucketLayout.FILE_SYSTEM_OPTIMIZED))) .thenReturn(keyTableFSO); + when(keyTableFSO.getName()).thenReturn("fileTable"); // Mock table name for parallelization TypedTable.TypedTableIterator mockIterFSO = mock(TypedTable.TypedTableIterator.class); when(keyTableFSO.iterator()).thenReturn(mockIterFSO); when(mockIterFSO.hasNext()).thenReturn(true, true, true, false); diff --git a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestOmTableInsightTask.java b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestOmTableInsightTask.java index faa158bfab3d..4f64d27297b2 100644 --- a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestOmTableInsightTask.java +++ b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestOmTableInsightTask.java @@ -38,6 +38,8 @@ import static org.apache.ozone.recon.schema.generated.tables.GlobalStatsTable.GLOBAL_STATS; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -53,6 +55,7 @@ import org.apache.hadoop.hdds.client.RatisReplicationConfig; import org.apache.hadoop.hdds.client.StandaloneReplicationConfig; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.utils.db.DBStore; import org.apache.hadoop.hdds.utils.db.Table; import org.apache.hadoop.hdds.utils.db.TypedTable; import org.apache.hadoop.ozone.om.OMMetadataManager; @@ -346,6 +349,25 @@ public void testProcessForDeletedDirectoryTable() throws IOException { @Test public void testReprocessForCount() throws Exception { OMMetadataManager omMetadataManager = mock(OmMetadataManagerImpl.class); + + // Mock DBStore for getStore() calls + DBStore mockStore = mock(DBStore.class); + when(omMetadataManager.getStore()).thenReturn(mockStore); + + // Mock getDeletedTable() for DeletedKeysInsightHandler + TypedTable deletedTable = mock(TypedTable.class); + TypedTable.TypedTableIterator deletedIter = mock(TypedTable.TypedTableIterator.class); + when(deletedTable.iterator()).thenReturn(deletedIter); + when(omMetadataManager.getDeletedTable()).thenReturn(deletedTable); + when(deletedIter.hasNext()).thenReturn(true, true, true, true, true, false); + + RepeatedOmKeyInfo deletedKeyInfo = mock(RepeatedOmKeyInfo.class); + when(deletedKeyInfo.getTotalSize()).thenReturn(ImmutablePair.of(100L, 100L)); + when(deletedKeyInfo.getOmKeyInfoList()).thenReturn(Arrays.asList(mock(OmKeyInfo.class))); + + Table.KeyValue deletedKv = mock(Table.KeyValue.class); + when(deletedKv.getValue()).thenReturn(deletedKeyInfo); + when(deletedIter.next()).thenReturn(deletedKv); // Mock 5 rows in each table and test the count for (String tableName : omTableInsightTask.getTaskTables()) { @@ -353,8 +375,13 @@ public void testReprocessForCount() throws Exception { TypedTable.TypedTableIterator mockIter = mock(TypedTable.TypedTableIterator.class); when(table.iterator()).thenReturn(mockIter); + when(table.keyIterator()).thenReturn(mockIter); + when(table.getEstimatedKeyCount()).thenReturn(5L); when(omMetadataManager.getTable(tableName)).thenReturn(table); when(mockIter.hasNext()).thenReturn(true, true, true, true, true, false); + + // Mock DBStore.getTable() to return the same table + when(mockStore.getTable(eq(tableName), any(), any(), any())).thenAnswer(invocation -> table); final Table.KeyValue mockKeyValue = mock(Table.KeyValue.class);