diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml index 7a67f27cc49e..167e30ba2dac 100644 --- a/hadoop-hdds/common/src/main/resources/ozone-default.xml +++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml @@ -4336,7 +4336,7 @@ recon rocks DB containerKeyTable - + ozone.recon.filesizecount.flush.db.max.threshold 200000 diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java index dc75200214ff..a57095d2e4c7 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java @@ -153,7 +153,7 @@ public final class ReconServerConfigKeys { "ozone.recon.nssummary.flush.db.max.threshold"; public static final long - OZONE_RECON_NSSUMMARY_FLUSH_TO_DB_MAX_THRESHOLD_DEFAULT = 150 * 1000L; + OZONE_RECON_NSSUMMARY_FLUSH_TO_DB_MAX_THRESHOLD_DEFAULT = 150 * 1000L * 2; public static final String OZONE_RECON_CONTAINER_KEY_FLUSH_TO_DB_MAX_THRESHOLD = diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/NSSummaryAsyncFlusher.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/NSSummaryAsyncFlusher.java new file mode 100644 index 000000000000..d25eaced22ff --- /dev/null +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/NSSummaryAsyncFlusher.java @@ -0,0 +1,315 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.recon.tasks; + +import java.io.Closeable; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.hdds.utils.db.RDBBatchOperation; +import org.apache.hadoop.ozone.recon.api.types.NSSummary; +import org.apache.hadoop.ozone.recon.spi.ReconNamespaceSummaryManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Async flusher for NSSummary maps with background thread. + * Workers submit their maps to a queue, background thread processes them. + */ +public final class NSSummaryAsyncFlusher implements Closeable { + private static final Logger LOG = LoggerFactory.getLogger(NSSummaryAsyncFlusher.class); + + private final BlockingQueue> flushQueue; + private final Thread backgroundFlusher; + private final AtomicReference state = + new AtomicReference<>(FlushState.RUNNING); + private volatile Exception failureCause = null; + private final ReconNamespaceSummaryManager reconNamespaceSummaryManager; + private final String taskName; + + private enum FlushState { + RUNNING, + STOPPING, + STOPPED, + FAILED + } + + private NSSummaryAsyncFlusher(ReconNamespaceSummaryManager reconNamespaceSummaryManager, + String taskName, + int queueCapacity) { + this.reconNamespaceSummaryManager = reconNamespaceSummaryManager; + this.taskName = taskName; + this.flushQueue = new LinkedBlockingQueue<>(queueCapacity); + + this.backgroundFlusher = new Thread(this::flushLoop, taskName + "-AsyncFlusher"); + this.backgroundFlusher.setDaemon(true); + } + + /** + * Factory method to create and start an async flusher. + */ + public static NSSummaryAsyncFlusher create( + ReconNamespaceSummaryManager reconNamespaceSummaryManager, + String taskName, + int queueCapacity) { + NSSummaryAsyncFlusher flusher = new NSSummaryAsyncFlusher( + reconNamespaceSummaryManager, taskName, queueCapacity); + flusher.backgroundFlusher.start(); + LOG.info("{}: Started async flusher with queue capacity {}", taskName, queueCapacity); + return flusher; + } + + /** + * Submit a worker map for async flushing. + * Blocks if queue is full (natural backpressure). + * @throws IOException if the async flusher has failed + */ + public void submitForFlush(Map workerMap) + throws InterruptedException, IOException { + // Check if flusher has failed - reject new submissions + if (state.get() == FlushState.FAILED) { + throw new IOException(taskName + ": Cannot submit - async flusher has failed", + failureCause); + } + + flushQueue.put(workerMap); + LOG.debug("{}: Submitted map with {} entries, queue size now {}", + taskName, workerMap.size(), flushQueue.size()); + } + + /** + * Check if the async flusher has encountered any failures. + * Workers should call this periodically to detect failures fast. + * @throws IOException if a failure has occurred + */ + public void checkForFailures() throws IOException { + if (state.get() == FlushState.FAILED && failureCause != null) { + if (failureCause instanceof IOException) { + throw (IOException) failureCause; + } else { + throw new IOException(taskName + ": Async flusher failed", failureCause); + } + } + } + + /** + * Background thread loop that processes flush queue. + */ + private void flushLoop() { + while (state.get() == FlushState.RUNNING || !flushQueue.isEmpty()) { + try { + // Attempt to retrieve one batch from the queue + Map workerMap = flushQueue.poll(100, TimeUnit.MILLISECONDS); + + if (workerMap == null) { + continue; + } + + // Process this batch + LOG.debug("{}: Background thread processing batch with {} entries", taskName, workerMap.size()); + flushWithPropagation(workerMap); + LOG.debug("{}: Background thread finished batch", taskName); + + } catch (InterruptedException e) { + // If we're stopping, ignore interrupts and keep draining the queue. + // Otherwise, preserve interrupt and exit. + if (state.get() == FlushState.STOPPING) { + LOG.debug("{}: Flusher thread interrupted while stopping; continuing to drain queue", + taskName); + Thread.interrupted(); // clear interrupt flag + continue; + } + LOG.info("{}: Flusher thread interrupted", taskName); + Thread.currentThread().interrupt(); + break; + } catch (IOException e) { + // For DB write errors + LOG.error("{}: FATAL - DB write failed, stopping async flusher. " + + "Remaining {} batches in queue will NOT be processed. " + + "Workers will be stopped immediately.", + taskName, flushQueue.size(), e); + failureCause = e; + state.set(FlushState.FAILED); + break; + } catch (Exception e) { + // Other unexpected errors are also fatal + LOG.error("{}: FATAL - Unexpected error in flush loop, stopping async flusher. " + + "Remaining {} batches in queue will NOT be processed. " + + "Workers will be stopped immediately.", + taskName, flushQueue.size(), e); + failureCause = e; + state.set(FlushState.FAILED); + break; + } + } + + // Only set STOPPED if we didn't fail + if (state.get() != FlushState.FAILED) { + state.set(FlushState.STOPPED); + } + + LOG.info("{}: Async flusher stopped with state: {}", taskName, state.get()); + } + + /** + * Flush worker map with propagation to ancestors. + */ + private void flushWithPropagation(Map workerMap) throws IOException { + + Map mergedMap = new HashMap<>(); + + // For each object in worker map (could be either a directory or bucket) + for (Map.Entry entry : workerMap.entrySet()) { + long currentObjectId = entry.getKey(); + NSSummary delta = entry.getValue(); + + // Get actual UpToDate nssummary (check merged map first, then DB) + NSSummary existingNSSummary = mergedMap.get(currentObjectId); + if (existingNSSummary == null) { + existingNSSummary = reconNamespaceSummaryManager.getNSSummary(currentObjectId); + } + + if (existingNSSummary == null) { + // Object doesn't exist in DB yet - use delta as base (has metadata like dirName, parentId) + existingNSSummary = delta; + } else { + // Object exists in DB - merge delta into it + + // Skip numeric merging if delta has no file data (e.g., directory skeleton with zero counts) + if (delta.getNumOfFiles() > 0 || delta.getSizeOfFiles() > 0) { + existingNSSummary.setNumOfFiles( + existingNSSummary.getNumOfFiles() + delta.getNumOfFiles()); + existingNSSummary.setSizeOfFiles( + existingNSSummary.getSizeOfFiles() + delta.getSizeOfFiles()); + existingNSSummary.setReplicatedSizeOfFiles( + existingNSSummary.getReplicatedSizeOfFiles() + delta.getReplicatedSizeOfFiles()); + + // Merge file size buckets + int[] actualBucket = existingNSSummary.getFileSizeBucket(); + int[] deltaBucket = delta.getFileSizeBucket(); + for (int i = 0; i < actualBucket.length; i++) { + actualBucket[i] += deltaBucket[i]; + } + existingNSSummary.setFileSizeBucket(actualBucket); + } + + // Merge child dirs (needed for directory relationships) + existingNSSummary.getChildDir().addAll(delta.getChildDir()); + + // Repair dirName if existing entry is missing it and delta has the value + if (StringUtils.isEmpty(existingNSSummary.getDirName()) && StringUtils.isNotEmpty(delta.getDirName())) { + existingNSSummary.setDirName(delta.getDirName()); + } + // Repair parentId if existing entry is missing it and delta has the value + if (existingNSSummary.getParentId() == 0 && delta.getParentId() != 0) { + existingNSSummary.setParentId(delta.getParentId()); + } + } + + // Store updated object in merged map + mergedMap.put(currentObjectId, existingNSSummary); + + if (delta.getSizeOfFiles() > 0 || delta.getNumOfFiles() > 0) { + // Propagate delta to ancestors (parent, grandparent, etc.) + propagateDeltaToAncestors(existingNSSummary.getParentId(), delta, mergedMap); + } + } + + // Write merged map to DB + writeToDb(mergedMap); + LOG.debug("{}: Flush completed, wrote {} entries", taskName, mergedMap.size()); + } + + /** + * Recursively propagate delta values up the ancestor Id. + * Pattern: Check merged map first (for updates in this batch), then DB, for ACTUAL value. + */ + private void propagateDeltaToAncestors(long ancestorId, NSSummary delta, + Map mergedMap) throws IOException { + // Base case: reached above bucket level + if (ancestorId == 0) { + return; + } + + // Get ACTUAL ancestor (check merged map first for most up-to-date, then DB) + NSSummary actualAncestor = mergedMap.get(ancestorId); + if (actualAncestor == null) { + actualAncestor = reconNamespaceSummaryManager.getNSSummary(ancestorId); + if (actualAncestor == null) { + // Ancestor not in DB yet return + return; + } + } + + // Apply DELTA to ACTUAL ancestor + actualAncestor.setNumOfFiles( + actualAncestor.getNumOfFiles() + delta.getNumOfFiles()); + actualAncestor.setSizeOfFiles( + actualAncestor.getSizeOfFiles() + delta.getSizeOfFiles()); + actualAncestor.setReplicatedSizeOfFiles( + actualAncestor.getReplicatedSizeOfFiles() + delta.getReplicatedSizeOfFiles()); + + // Store updated ACTUAL ancestor in merged map + mergedMap.put(ancestorId, actualAncestor); + + // Recursively propagate to next ancestor (grandparent, great-grandparent, etc.) + propagateDeltaToAncestors(actualAncestor.getParentId(), delta, mergedMap); + } + + /** + * Write merged map to DB using batch operation. + */ + private void writeToDb(Map mergedMap) throws IOException { + try (RDBBatchOperation rdbBatchOperation = RDBBatchOperation.newAtomicOperation()) { + for (Map.Entry entry : mergedMap.entrySet()) { + reconNamespaceSummaryManager.batchStoreNSSummaries( + rdbBatchOperation, entry.getKey(), entry.getValue()); + } + reconNamespaceSummaryManager.commitBatchOperation(rdbBatchOperation); + LOG.debug("{}: Wrote {} entries to DB", taskName, mergedMap.size()); + } catch (IOException e) { + LOG.error("{}: Failed to flush to DB", taskName, e); + throw e; + } + } + + @Override + public void close() throws IOException { + LOG.info("{}: Shutting down async flusher", taskName); + try { + // Tell the background thread to stop once the queue is drained. + state.set(FlushState.STOPPING); + backgroundFlusher.join(); + + // Check if there were any failures during processing + checkForFailures(); + + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException("Interrupted while shutting down async flusher", e); + } + + LOG.info("{}: Async flusher shut down complete", taskName); + } +} + diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/NSSummaryTask.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/NSSummaryTask.java index 6b4a5cce0e71..e6b69f390633 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/NSSummaryTask.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/NSSummaryTask.java @@ -41,6 +41,7 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.utils.db.DBStore; import org.apache.hadoop.ozone.om.OMMetadataManager; +import org.apache.hadoop.ozone.recon.ReconServerConfigKeys; import org.apache.hadoop.ozone.recon.recovery.ReconOMMetadataManager; import org.apache.hadoop.ozone.recon.spi.ReconNamespaceSummaryManager; import org.slf4j.Logger; @@ -101,18 +102,30 @@ public NSSummaryTask(ReconNamespaceSummaryManager this.reconNamespaceSummaryManager = reconNamespaceSummaryManager; this.reconOMMetadataManager = reconOMMetadataManager; this.ozoneConfiguration = ozoneConfiguration; + long nsSummaryFlushToDBMaxThreshold = ozoneConfiguration.getLong( OZONE_RECON_NSSUMMARY_FLUSH_TO_DB_MAX_THRESHOLD, OZONE_RECON_NSSUMMARY_FLUSH_TO_DB_MAX_THRESHOLD_DEFAULT); - - this.nsSummaryTaskWithFSO = new NSSummaryTaskWithFSO( - reconNamespaceSummaryManager, reconOMMetadataManager, - nsSummaryFlushToDBMaxThreshold); - this.nsSummaryTaskWithLegacy = new NSSummaryTaskWithLegacy( - reconNamespaceSummaryManager, reconOMMetadataManager, - ozoneConfiguration, nsSummaryFlushToDBMaxThreshold); - this.nsSummaryTaskWithOBS = new NSSummaryTaskWithOBS( - reconNamespaceSummaryManager, reconOMMetadataManager, nsSummaryFlushToDBMaxThreshold); + + int maxKeysInMemory = ozoneConfiguration.getInt( + ReconServerConfigKeys.OZONE_RECON_TASK_REPROCESS_MAX_KEYS_IN_MEMORY, + ReconServerConfigKeys.OZONE_RECON_TASK_REPROCESS_MAX_KEYS_IN_MEMORY_DEFAULT); + int maxIterators = ozoneConfiguration.getInt( + ReconServerConfigKeys.OZONE_RECON_TASK_REPROCESS_MAX_ITERATORS, + ReconServerConfigKeys.OZONE_RECON_TASK_REPROCESS_MAX_ITERATORS_DEFAULT); + int maxWorkers = ozoneConfiguration.getInt( + ReconServerConfigKeys.OZONE_RECON_TASK_REPROCESS_MAX_WORKERS, + ReconServerConfigKeys.OZONE_RECON_TASK_REPROCESS_MAX_WORKERS_DEFAULT); + + this.nsSummaryTaskWithFSO = + new NSSummaryTaskWithFSO(reconNamespaceSummaryManager, reconOMMetadataManager, nsSummaryFlushToDBMaxThreshold, + maxIterators, maxWorkers, maxKeysInMemory); + this.nsSummaryTaskWithLegacy = + new NSSummaryTaskWithLegacy(reconNamespaceSummaryManager, reconOMMetadataManager, ozoneConfiguration, + nsSummaryFlushToDBMaxThreshold); + this.nsSummaryTaskWithOBS = + new NSSummaryTaskWithOBS(reconNamespaceSummaryManager, reconOMMetadataManager, nsSummaryFlushToDBMaxThreshold, + maxIterators, maxWorkers, maxKeysInMemory); } @Override diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/NSSummaryTaskDbEventHandler.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/NSSummaryTaskDbEventHandler.java index 3569bae71a18..f4e7163e3b90 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/NSSummaryTaskDbEventHandler.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/NSSummaryTaskDbEventHandler.java @@ -119,6 +119,39 @@ protected void handlePutKeyEvent(OmKeyInfo keyInfo, Map nsSummaryMap, long parentObjectId) throws IOException { + + // Get from local thread map only, no DB reads during reprocess + NSSummary nsSummary = nsSummaryMap.get(parentObjectId); + if (nsSummary == null) { + // Create new instance if not found + nsSummary = new NSSummary(); + } + int[] fileBucket = nsSummary.getFileSizeBucket(); + + // Update immediate parent's totals (includes all descendant files) + nsSummary.setNumOfFiles(nsSummary.getNumOfFiles() + 1); + nsSummary.setSizeOfFiles(nsSummary.getSizeOfFiles() + keyInfo.getDataSize()); + // Before arithmetic operations, check for sentinel value + long currentReplSize = nsSummary.getReplicatedSizeOfFiles(); + if (currentReplSize < 0) { + // Old data, initialize to 0 before first use + currentReplSize = 0; + nsSummary.setReplicatedSizeOfFiles(0); + } + nsSummary.setReplicatedSizeOfFiles(currentReplSize + keyInfo.getReplicatedSize()); + int binIndex = ReconUtils.getFileSizeBinIndex(keyInfo.getDataSize()); + + ++fileBucket[binIndex]; + nsSummary.setFileSizeBucket(fileBucket); + nsSummaryMap.put(parentObjectId, nsSummary); + } + protected void handlePutDirEvent(OmDirectoryInfo directoryInfo, Map nsSummaryMap) throws IOException { @@ -181,6 +214,42 @@ protected void handlePutDirEvent(OmDirectoryInfo directoryInfo, } } + /** + * Handle PUT directory event during reprocess operation. + * Does not read from DB, only works with in-memory map. + */ + protected void handlePutDirEventReprocess(OmDirectoryInfo directoryInfo, + Map nsSummaryMap) + throws IOException { + long parentObjectId = directoryInfo.getParentObjectID(); + long objectId = directoryInfo.getObjectID(); + // write the dir name to the current directory + String dirName = directoryInfo.getName(); + + // Get or create the directory's NSSummary (no DB reads during reprocess) + NSSummary curNSSummary = nsSummaryMap.get(objectId); + if (curNSSummary == null) { + curNSSummary = new NSSummary(); + } + + curNSSummary.setDirName(dirName); + curNSSummary.setParentId(parentObjectId); + nsSummaryMap.put(objectId, curNSSummary); + + // Get or create the parent's NSSummary (no DB reads during reprocess) + NSSummary parentNSSummary = nsSummaryMap.get(parentObjectId); + if (parentNSSummary == null) { + // Create new instance if not found + parentNSSummary = new NSSummary(); + } + + // Add child directory to parent + parentNSSummary.addChildDir(objectId); + + // If the directory already existed with content, update immediate parent's stats + nsSummaryMap.put(parentObjectId, parentNSSummary); + } + protected void handleDeleteKeyEvent( OmKeyInfo keyInfo, Map nsSummaryMap, @@ -342,7 +411,7 @@ protected void propagateSizeUpwards(long objectId, long sizeChange, long replica parentSummary.setReplicatedSizeOfFiles(parentReplSize + replicatedSizeChange); parentSummary.setNumOfFiles(parentSummary.getNumOfFiles() + countChange); nsSummaryMap.put(parentId, parentSummary); - + // Recursively propagate to grandparents propagateSizeUpwards(parentId, sizeChange, replicatedSizeChange, countChange, nsSummaryMap); } diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/NSSummaryTaskWithFSO.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/NSSummaryTaskWithFSO.java index 0c3287d9eabd..9f281a908eed 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/NSSummaryTaskWithFSO.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/NSSummaryTaskWithFSO.java @@ -29,10 +29,12 @@ import java.util.HashMap; import java.util.Iterator; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Function; import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.lang3.tuple.Pair; +import org.apache.hadoop.hdds.utils.db.StringCodec; import org.apache.hadoop.hdds.utils.db.Table; -import org.apache.hadoop.hdds.utils.db.TableIterator; import org.apache.hadoop.ozone.om.OMMetadataManager; import org.apache.hadoop.ozone.om.helpers.OmDirectoryInfo; import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; @@ -40,6 +42,7 @@ import org.apache.hadoop.ozone.recon.api.types.NSSummary; import org.apache.hadoop.ozone.recon.recovery.ReconOMMetadataManager; import org.apache.hadoop.ozone.recon.spi.ReconNamespaceSummaryManager; +import org.apache.hadoop.ozone.recon.tasks.util.ParallelTableIteratorOperation; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,15 +55,24 @@ public class NSSummaryTaskWithFSO extends NSSummaryTaskDbEventHandler { LoggerFactory.getLogger(NSSummaryTaskWithFSO.class); private final long nsSummaryFlushToDBMaxThreshold; + private final int maxIterators; + private final int maxWorkers; + private final int maxKeysInMemory; public NSSummaryTaskWithFSO(ReconNamespaceSummaryManager reconNamespaceSummaryManager, ReconOMMetadataManager reconOMMetadataManager, - long nsSummaryFlushToDBMaxThreshold) { + long nsSummaryFlushToDBMaxThreshold, + int maxIterators, + int maxWorkers, + int maxKeysInMemory) { super(reconNamespaceSummaryManager, reconOMMetadataManager); this.nsSummaryFlushToDBMaxThreshold = nsSummaryFlushToDBMaxThreshold; + this.maxIterators = maxIterators; + this.maxWorkers = maxWorkers; + this.maxKeysInMemory = maxKeysInMemory; } // We listen to updates from FSO-enabled FileTable, DirTable, DeletedTable and DeletedDirTable @@ -222,55 +234,174 @@ private void handleUpdateOnFileTable(OMDBUpdateEvent nsSummaryMap = new HashMap<>(); + // We run reprocess in two phases with separate flushers so that directory + // skeletons are fully persisted before file updates rely on them. + final int queueCapacity = maxWorkers * 10; - try { - Table dirTable = - omMetadataManager.getDirectoryTable(); - try (TableIterator> - dirTableIter = dirTable.iterator()) { - while (dirTableIter.hasNext()) { - Table.KeyValue kv = dirTableIter.next(); - OmDirectoryInfo directoryInfo = kv.getValue(); - handlePutDirEvent(directoryInfo, nsSummaryMap); - if (nsSummaryMap.size() >= nsSummaryFlushToDBMaxThreshold) { - if (!flushAndCommitNSToDB(nsSummaryMap)) { - return false; - } - } - } + try (NSSummaryAsyncFlusher dirFlusher = + NSSummaryAsyncFlusher.create(getReconNamespaceSummaryManager(), + "NSSummaryTaskWithFSO-dir", queueCapacity)) { + if (!processDirTableInParallel(omMetadataManager, dirFlusher)) { + return false; + } + } catch (Exception ex) { + LOG.error("Unable to reprocess Namespace Summary data in Recon DB (dir phase).", ex); + return false; + } + + try (NSSummaryAsyncFlusher fileFlusher = + NSSummaryAsyncFlusher.create(getReconNamespaceSummaryManager(), + "NSSummaryTaskWithFSO-file", queueCapacity)) { + if (!processFileTableInParallel(omMetadataManager, fileFlusher)) { + return false; } + } catch (Exception ex) { + LOG.error("Unable to reprocess Namespace Summary data in Recon DB (file phase).", ex); + return false; + } + + LOG.info("Completed a reprocess run of NSSummaryTaskWithFSO"); + return true; + } - // Get fileTable used by FSO - Table keyTable = - omMetadataManager.getFileTable(); - - try (TableIterator> - keyTableIter = keyTable.iterator()) { - while (keyTableIter.hasNext()) { - Table.KeyValue kv = keyTableIter.next(); - OmKeyInfo keyInfo = kv.getValue(); - handlePutKeyEvent(keyInfo, nsSummaryMap, keyInfo.getParentObjectID()); - if (nsSummaryMap.size() >= nsSummaryFlushToDBMaxThreshold) { - if (!flushAndCommitNSToDB(nsSummaryMap)) { - return false; - } - } + /** + * Process dirTable in parallel using per-worker maps with async flushing. + */ + private boolean processDirTableInParallel(OMMetadataManager omMetadataManager, + NSSummaryAsyncFlusher asyncFlusher) { + Table dirTable = omMetadataManager.getDirectoryTable(); + + // Per-worker maps for lockless updates + Map> allWorkerMaps = new ConcurrentHashMap<>(); + + // Divide threshold by worker count + final long perWorkerThreshold = Math.max(1, nsSummaryFlushToDBMaxThreshold / maxWorkers); + + Function, Void> kvOperation = kv -> { + // Get this worker's private map + long threadId = Thread.currentThread().getId(); + Map workerMap = allWorkerMaps.computeIfAbsent(threadId, k -> new HashMap<>()); + + try { + // Check if async flusher has failed - stop immediately if so + asyncFlusher.checkForFailures(); + + // Update immediate parent only, NO DB reads during reprocess + handlePutDirEventReprocess(kv.getValue(), workerMap); + + // Submit to async queue when threshold reached + if (workerMap.size() >= perWorkerThreshold) { + asyncFlusher.submitForFlush(workerMap); + // Get fresh map for this worker + allWorkerMaps.put(threadId, new HashMap<>()); } + } catch (Exception e) { + throw new RuntimeException(e); } + return null; + }; + + LOG.debug("Starting dirTable parallel iteration"); + long dirStartTime = System.currentTimeMillis(); - } catch (IOException ioEx) { - LOG.error("Unable to reprocess Namespace Summary data in Recon DB. ", - ioEx); + try (ParallelTableIteratorOperation parallelIter = + new ParallelTableIteratorOperation<>(omMetadataManager, dirTable, StringCodec.get(), + maxIterators, maxWorkers, maxKeysInMemory, nsSummaryFlushToDBMaxThreshold)) { + parallelIter.performTaskOnTableVals("NSSummaryTaskWithFSO-dirTable", null, null, kvOperation); + } catch (Exception ex) { + LOG.error("Unable to process dirTable in parallel", ex); return false; } - // flush and commit left out keys at end - if (!flushAndCommitNSToDB(nsSummaryMap)) { - LOG.info("flushAndCommitNSToDB failed during reprocessWithFSO."); + + long dirEndTime = System.currentTimeMillis(); + LOG.debug("Completed dirTable parallel iteration in {} ms", (dirEndTime - dirStartTime)); + + // Submit any remaining worker maps + for (Map remainingMap : allWorkerMaps.values()) { + if (!remainingMap.isEmpty()) { + try { + asyncFlusher.submitForFlush(remainingMap); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return false; + } catch (IOException e) { + LOG.error("Failed to submit remaining map for flush", e); + return false; + } + } + } + + return true; + } + + /** + * Process fileTable in parallel using per-worker maps with async flushing. + */ + private boolean processFileTableInParallel(OMMetadataManager omMetadataManager, + NSSummaryAsyncFlusher asyncFlusher) { + Table fileTable = omMetadataManager.getFileTable(); + + // Per-worker maps for lockless updates + Map> allWorkerMaps = new ConcurrentHashMap<>(); + + // Divide threshold by worker count + final long perWorkerThreshold = Math.max(1, nsSummaryFlushToDBMaxThreshold / maxWorkers); + + Function, Void> kvOperation = kv -> { + // Get this worker's private map + long threadId = Thread.currentThread().getId(); + Map workerMap = allWorkerMaps.computeIfAbsent(threadId, k -> new HashMap<>()); + + try { + // Check if async flusher has failed - stop immediately if so + asyncFlusher.checkForFailures(); + + // Update immediate parent only, NO DB reads during reprocess + OmKeyInfo keyInfo = kv.getValue(); + handlePutKeyEventReprocess(keyInfo, workerMap, keyInfo.getParentObjectID()); + + // Submit to async queue when threshold reached + if (workerMap.size() >= perWorkerThreshold) { + asyncFlusher.submitForFlush(workerMap); + // Get fresh map for this worker + allWorkerMaps.put(threadId, new HashMap<>()); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + return null; + }; + + LOG.debug("Starting fileTable parallel iteration"); + long fileStartTime = System.currentTimeMillis(); + + try (ParallelTableIteratorOperation parallelIter = + new ParallelTableIteratorOperation<>(omMetadataManager, fileTable, StringCodec.get(), + maxIterators, maxWorkers, maxKeysInMemory, nsSummaryFlushToDBMaxThreshold)) { + parallelIter.performTaskOnTableVals("NSSummaryTaskWithFSO-fileTable", null, null, kvOperation); + } catch (Exception ex) { + LOG.error("Unable to process fileTable in parallel", ex); return false; } - LOG.info("Completed a reprocess run of NSSummaryTaskWithFSO"); + + long fileEndTime = System.currentTimeMillis(); + LOG.debug("Completed fileTable parallel iteration in {} ms", (fileEndTime - fileStartTime)); + + // Submit any remaining worker maps + for (Map remainingMap : allWorkerMaps.values()) { + if (!remainingMap.isEmpty()) { + try { + asyncFlusher.submitForFlush(remainingMap); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return false; + } catch (IOException e) { + LOG.error("Failed to submit remaining map for flush", e); + return false; + } + } + } return true; } + } diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/NSSummaryTaskWithOBS.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/NSSummaryTaskWithOBS.java index 699e0612544e..a78439616729 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/NSSummaryTaskWithOBS.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/NSSummaryTaskWithOBS.java @@ -20,13 +20,16 @@ import static org.apache.hadoop.ozone.om.codec.OMDBDefinition.KEY_TABLE; import java.io.IOException; +import java.io.UncheckedIOException; import java.util.HashMap; import java.util.Iterator; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Function; import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.lang3.tuple.Pair; +import org.apache.hadoop.hdds.utils.db.StringCodec; import org.apache.hadoop.hdds.utils.db.Table; -import org.apache.hadoop.hdds.utils.db.TableIterator; import org.apache.hadoop.ozone.om.OMMetadataManager; import org.apache.hadoop.ozone.om.helpers.BucketLayout; import org.apache.hadoop.ozone.om.helpers.OmBucketInfo; @@ -35,6 +38,7 @@ import org.apache.hadoop.ozone.recon.api.types.NSSummary; import org.apache.hadoop.ozone.recon.recovery.ReconOMMetadataManager; import org.apache.hadoop.ozone.recon.spi.ReconNamespaceSummaryManager; +import org.apache.hadoop.ozone.recon.tasks.util.ParallelTableIteratorOperation; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,66 +54,132 @@ public class NSSummaryTaskWithOBS extends NSSummaryTaskDbEventHandler { private final long nsSummaryFlushToDBMaxThreshold; + private final int maxIterators; + private final int maxWorkers; + private final int maxKeysInMemory; + public NSSummaryTaskWithOBS( ReconNamespaceSummaryManager reconNamespaceSummaryManager, ReconOMMetadataManager reconOMMetadataManager, - long nsSummaryFlushToDBMaxThreshold) { + long nsSummaryFlushToDBMaxThreshold, + int maxIterators, + int maxWorkers, + int maxKeysInMemory) { super(reconNamespaceSummaryManager, reconOMMetadataManager); this.nsSummaryFlushToDBMaxThreshold = nsSummaryFlushToDBMaxThreshold; + this.maxIterators = maxIterators; + this.maxWorkers = maxWorkers; + this.maxKeysInMemory = maxKeysInMemory; } public boolean reprocessWithOBS(OMMetadataManager omMetadataManager) { - Map nsSummaryMap = new HashMap<>(); + // Create async flusher with queue capacity based on worker count + int queueCapacity = maxWorkers * 2; - try { - Table keyTable = - omMetadataManager.getKeyTable(BUCKET_LAYOUT); - - try (TableIterator> - keyTableIter = keyTable.iterator()) { - - while (keyTableIter.hasNext()) { - Table.KeyValue kv = keyTableIter.next(); - OmKeyInfo keyInfo = kv.getValue(); - - // KeyTable entries belong to both Legacy and OBS buckets. - // Check bucket layout and if it's anything other than OBS, - // continue to the next iteration. - String volumeName = keyInfo.getVolumeName(); - String bucketName = keyInfo.getBucketName(); - String bucketDBKey = omMetadataManager - .getBucketKey(volumeName, bucketName); - // Get bucket info from bucket table - OmBucketInfo omBucketInfo = omMetadataManager - .getBucketTable().getSkipCache(bucketDBKey); - - if (omBucketInfo.getBucketLayout() != BUCKET_LAYOUT) { - continue; - } + try (NSSummaryAsyncFlusher asyncFlusher = + NSSummaryAsyncFlusher.create(getReconNamespaceSummaryManager(), + "NSSummaryTaskWithOBS", queueCapacity)) { + + if (!processKeyTableInParallel(omMetadataManager, asyncFlusher)) { + return false; + } + + } catch (Exception ex) { + LOG.error("Unable to reprocess Namespace Summary data in Recon DB.", ex); + return false; + } + + LOG.info("Completed a reprocess run of NSSummaryTaskWithOBS"); + return true; + } + + private boolean processKeyTableInParallel(OMMetadataManager omMetadataManager, + NSSummaryAsyncFlusher asyncFlusher) { + Table keyTable = + omMetadataManager.getKeyTable(BUCKET_LAYOUT); + + // Per-worker maps for lockless updates + Map> allWorkerMaps = new ConcurrentHashMap<>(); - long parentObjectID = getKeyParentID(keyInfo); + // Divide threshold by worker count + final long perWorkerThreshold = Math.max(1, nsSummaryFlushToDBMaxThreshold / maxWorkers); - handlePutKeyEvent(keyInfo, nsSummaryMap, parentObjectID); - if (nsSummaryMap.size() >= nsSummaryFlushToDBMaxThreshold) { - if (!flushAndCommitNSToDB(nsSummaryMap)) { - return false; - } + Function, Void> kvOperation = kv -> { + // Get this worker's private map + long threadId = Thread.currentThread().getId(); + Map workerMap = allWorkerMaps.computeIfAbsent(threadId, k -> new HashMap<>()); + + try { + OmKeyInfo keyInfo = kv.getValue(); + + // KeyTable entries belong to both Legacy and OBS buckets. + // Check bucket layout and if it's anything other than OBS, + // continue to the next iteration. + String volumeName = keyInfo.getVolumeName(); + String bucketName = keyInfo.getBucketName(); + String bucketDBKey = omMetadataManager + .getBucketKey(volumeName, bucketName); + // Get bucket info from bucket table + OmBucketInfo omBucketInfo = omMetadataManager + .getBucketTable().getSkipCache(bucketDBKey); + + if (omBucketInfo != null && omBucketInfo.getBucketLayout() == BUCKET_LAYOUT) { + // Check if async flusher has failed - stop immediately if so + asyncFlusher.checkForFailures(); + + // Look up parent ID for OBS keys (not populated in keyInfo from DB) + long parentObjectId = getKeyParentID(keyInfo); + + // Use reprocess-specific method (no DB reads) + handlePutKeyEventReprocess(keyInfo, workerMap, parentObjectId); + + // Submit to async queue when threshold reached + if (workerMap.size() >= perWorkerThreshold) { + asyncFlusher.submitForFlush(workerMap); + // Get fresh map for this worker + allWorkerMaps.put(threadId, new HashMap<>()); } } + } catch (IOException e) { + throw new UncheckedIOException(e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); } - } catch (IOException ioEx) { - LOG.error("Unable to reprocess Namespace Summary data in Recon DB. ", - ioEx); - nsSummaryMap.clear(); + return null; + }; + + LOG.debug("Starting keyTable parallel iteration"); + long keyStartTime = System.currentTimeMillis(); + + try (ParallelTableIteratorOperation parallelIter = + new ParallelTableIteratorOperation<>(omMetadataManager, keyTable, StringCodec.get(), + maxIterators, maxWorkers, maxKeysInMemory, nsSummaryFlushToDBMaxThreshold)) { + parallelIter.performTaskOnTableVals("NSSummaryTaskWithOBS-keyTable", null, null, kvOperation); + } catch (Exception ex) { + LOG.error("Unable to process keyTable in parallel", ex); return false; } - - // flush and commit left out entries at end - if (!flushAndCommitNSToDB(nsSummaryMap)) { - return false; + + long keyEndTime = System.currentTimeMillis(); + LOG.debug("Completed keyTable parallel iteration in {} ms", (keyEndTime - keyStartTime)); + + // Submit any remaining worker maps + for (Map remainingMap : allWorkerMaps.values()) { + if (!remainingMap.isEmpty()) { + try { + asyncFlusher.submitForFlush(remainingMap); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return false; + } catch (IOException e) { + LOG.error("Failed to submit remaining map for flush", e); + return false; + } + } } - LOG.debug("Completed a reprocess run of NSSummaryTaskWithOBS"); + return true; } diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/OmTableInsightTask.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/OmTableInsightTask.java index 59862cebb81f..a4853dbf31da 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/OmTableInsightTask.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/OmTableInsightTask.java @@ -208,7 +208,7 @@ private void processTableInParallel(String tableName, OMMetadataManager omMetada try (ParallelTableIteratorOperation parallelIter = new ParallelTableIteratorOperation<>( omMetadataManager, table, StringCodec.get(), maxIterators, workerCount, maxKeysInMemory, loggingThreshold)) { - + parallelIter.performTaskOnTableVals(getTaskName(), null, null, kv -> { if (kv != null) { count.incrementAndGet(); diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/util/ParallelTableIteratorOperation.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/util/ParallelTableIteratorOperation.java index c91a3d97775b..3c378c36cd9f 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/util/ParallelTableIteratorOperation.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/util/ParallelTableIteratorOperation.java @@ -55,8 +55,8 @@ public class ParallelTableIteratorOperation, V> implemen private final Codec keyCodec; // Thread Pools - private final ExecutorService iteratorExecutor; // 5 - private final ExecutorService valueExecutors; // 20 + private final ExecutorService iteratorExecutor; + private final ExecutorService valueExecutors; private final int maxNumberOfVals; private final OMMetadataManager metadataManager; @@ -72,19 +72,19 @@ public ParallelTableIteratorOperation(OMMetadataManager metadataManager, Table()); - // Create team of 20 worker threads with UNLIMITED queue + // Create team of worker threads with UNLIMITED queue this.valueExecutors = new ThreadPoolExecutor(workerCount, workerCount, 1, TimeUnit.MINUTES, new LinkedBlockingQueue<>()); - // Calculate batch size per worker (e.g., 2000 / 20 = 100 keys per batch per worker) + // Calculate batch size per worker this.maxNumberOfVals = Math.max(10, maxNumberOfValsInMemory / (workerCount)); this.logCountThreshold = logThreshold; } @@ -143,7 +143,7 @@ private void waitForQueueSize(Queue> futures, int expectedSize) public void performTaskOnTableVals(String taskName, K startKey, K endKey, Function, Void> keyOperation) throws IOException, ExecutionException, InterruptedException { List bounds = getBounds(startKey, endKey); - + LOG.debug("Length of the bounds - {}", bounds.size()); // Fallback for small tables (no SST files yet - data only in memtable) if (bounds.size() < 2) { try (TableIterator> iter = table.iterator()) { @@ -163,10 +163,10 @@ public void performTaskOnTableVals(String taskName, K startKey, K endKey, // ===== PARALLEL PROCESSING SETUP ===== - // Queue to track iterator threads (5 threads creating work) + // Queue to track iterator threads Queue> iterFutures = new LinkedList<>(); - // Queue to track worker threads (20 threads doing work) + // Queue to track worker threads Queue> workerFutures = new ConcurrentLinkedQueue<>(); AtomicLong keyCounter = new AtomicLong(); @@ -214,7 +214,7 @@ public void performTaskOnTableVals(String taskName, K startKey, K endKey, break; } - // If batch is full (2000 keys), stop collecting + // If batch is full, stop collecting if (keyValues.size() >= maxNumberOfVals) { break; } @@ -264,12 +264,11 @@ public void performTaskOnTableVals(String taskName, K startKey, K endKey, } // ===== STEP 7: WAIT FOR EVERYONE TO FINISH ===== - // Wait for all 5 iterator threads to finish reading + // Wait for all iterator threads to finish reading waitForQueueSize(iterFutures, 0); - // Wait for all 20 worker threads to finish processing + // Wait for all worker threads to finish processing waitForQueueSize(workerFutures, 0); - // Log final stats LOG.info("{}: Parallel iteration completed - Total keys processed: {}", taskName, keyCounter.get()); } diff --git a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestContainerEndpoint.java b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestContainerEndpoint.java index a1b62acd63d6..3b0764ba3fca 100644 --- a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestContainerEndpoint.java +++ b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestContainerEndpoint.java @@ -509,8 +509,7 @@ public void testGetKeysForContainer() throws Exception { // Set up test data for FSO keys setUpFSOData(); NSSummaryTaskWithFSO nSSummaryTaskWithFso = - new NSSummaryTaskWithFSO(reconNamespaceSummaryManager, - reconOMMetadataManager, 10); + new NSSummaryTaskWithFSO(reconNamespaceSummaryManager, reconOMMetadataManager, 10, 5, 20, 2000); nSSummaryTaskWithFso.reprocessWithFSO(reconOMMetadataManager); // Reprocess the container key mapper to ensure the latest mapping is used reprocessContainerKeyMapper(); @@ -597,8 +596,7 @@ public void testGetKeysForContainerWithPrevKey() throws Exception { // Reprocess the container key mapper to ensure the latest mapping is used reprocessContainerKeyMapper(); NSSummaryTaskWithFSO nSSummaryTaskWithFso = - new NSSummaryTaskWithFSO(reconNamespaceSummaryManager, - reconOMMetadataManager, 10); + new NSSummaryTaskWithFSO(reconNamespaceSummaryManager, reconOMMetadataManager, 10, 5, 20, 2000); nSSummaryTaskWithFso.reprocessWithFSO(reconOMMetadataManager); response = containerEndpoint.getKeysForContainer(20L, -1, "/0/1/2/file7"); @@ -1855,7 +1853,7 @@ public void testDuplicateFSOKeysForContainerEndpoint() throws Exception { setUpDuplicateFSOFileKeys(); NSSummaryTaskWithFSO nSSummaryTaskWithFso = new NSSummaryTaskWithFSO(reconNamespaceSummaryManager, - reconOMMetadataManager, 10); + reconOMMetadataManager, 10, 5, 20, 2000); nSSummaryTaskWithFso.reprocessWithFSO(reconOMMetadataManager); // Reprocess the container key mappings so that the new keys are loaded. reprocessContainerKeyMapper(); diff --git a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestNSSummaryDiskUsageOrdering.java b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestNSSummaryDiskUsageOrdering.java index 2d375b6fe866..86256a8a3f39 100644 --- a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestNSSummaryDiskUsageOrdering.java +++ b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestNSSummaryDiskUsageOrdering.java @@ -105,7 +105,7 @@ public void setUp() throws Exception { populateOMDB(); NSSummaryTaskWithFSO nSSummaryTaskWithFso = new NSSummaryTaskWithFSO(reconNamespaceSummaryManager, - reconOMMetadataManager, 10); + reconOMMetadataManager, 10, 5, 20, 2000); nSSummaryTaskWithFso.reprocessWithFSO(reconOMMetadataManager); } diff --git a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestNSSummaryEndpointWithFSO.java b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestNSSummaryEndpointWithFSO.java index f08a8131d4e0..320494b3fc82 100644 --- a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestNSSummaryEndpointWithFSO.java +++ b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestNSSummaryEndpointWithFSO.java @@ -431,8 +431,7 @@ public void setUp() throws Exception { populateVolumeThree(); setUpMultiBlockReplicatedKeys(); NSSummaryTaskWithFSO nSSummaryTaskWithFso = - new NSSummaryTaskWithFSO(reconNamespaceSummaryManager, - reconOMMetadataManager, 10); + new NSSummaryTaskWithFSO(reconNamespaceSummaryManager, reconOMMetadataManager, 10, 5, 20, 2000); nSSummaryTaskWithFso.reprocessWithFSO(reconOMMetadataManager); } diff --git a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestNSSummaryEndpointWithOBSAndLegacy.java b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestNSSummaryEndpointWithOBSAndLegacy.java index 146d84b400ef..ecb08c132340 100644 --- a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestNSSummaryEndpointWithOBSAndLegacy.java +++ b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestNSSummaryEndpointWithOBSAndLegacy.java @@ -369,7 +369,7 @@ public void setUp() throws Exception { populateOMDB(); NSSummaryTaskWithOBS nsSummaryTaskWithOBS = new NSSummaryTaskWithOBS(reconNamespaceSummaryManager, - reconOMMetadataManager, 10); + reconOMMetadataManager, 10, 5, 20, 2000); nsSummaryTaskWithOBS.reprocessWithOBS(reconOMMetadataManager); NSSummaryTaskWithLegacy nsSummaryTaskWithLegacy = new NSSummaryTaskWithLegacy(reconNamespaceSummaryManager, diff --git a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestOmDBInsightEndPoint.java b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestOmDBInsightEndPoint.java index c83501bb7a0d..a0f53f3172d8 100644 --- a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestOmDBInsightEndPoint.java +++ b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestOmDBInsightEndPoint.java @@ -330,11 +330,9 @@ public void setUp() throws Exception { reconNamespaceSummaryManager, reconOMMetadataManager, ozoneConfiguration, 10); NSSummaryTaskWithOBS nsSummaryTaskWithOBS = new NSSummaryTaskWithOBS( - reconNamespaceSummaryManager, - reconOMMetadataManager, 10); + reconNamespaceSummaryManager, reconOMMetadataManager, 10, 5, 20, 2000); NSSummaryTaskWithFSO nsSummaryTaskWithFSO = new NSSummaryTaskWithFSO( - reconNamespaceSummaryManager, - reconOMMetadataManager, 10); + reconNamespaceSummaryManager, reconOMMetadataManager, 10, 5, 20, 2000); reconNamespaceSummaryManager.clearNSSummaryTable(); nSSummaryTaskWithLegacy.reprocessWithLegacy(reconOMMetadataManager); nsSummaryTaskWithOBS.reprocessWithOBS(reconOMMetadataManager); diff --git a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestOpenKeysSearchEndpoint.java b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestOpenKeysSearchEndpoint.java index b59aaa3c8b8f..f8dfd43a6701 100644 --- a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestOpenKeysSearchEndpoint.java +++ b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestOpenKeysSearchEndpoint.java @@ -113,7 +113,7 @@ public void setUp() throws Exception { populateOMDB(); NSSummaryTaskWithFSO nSSummaryTaskWithFso = new NSSummaryTaskWithFSO(reconNamespaceSummaryManager, - reconOMMetadataManager, 10); + reconOMMetadataManager, 10, 5, 20, 2000); nSSummaryTaskWithFso.reprocessWithFSO(reconOMMetadataManager); } diff --git a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestNSSummaryTaskWithFSO.java b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestNSSummaryTaskWithFSO.java index e93f9176c142..133f55486cd1 100644 --- a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestNSSummaryTaskWithFSO.java +++ b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestNSSummaryTaskWithFSO.java @@ -81,7 +81,7 @@ void setUp(@TempDir File tmpDir) throws Exception { nSSummaryTaskWithFso = new NSSummaryTaskWithFSO( getReconNamespaceSummaryManager(), getReconOMMetadataManager(), - threshold); + threshold, 5, 20, 2000); } diff --git a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestNSSummaryTaskWithOBS.java b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestNSSummaryTaskWithOBS.java index 8d44633ba7ff..3b9d87133345 100644 --- a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestNSSummaryTaskWithOBS.java +++ b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestNSSummaryTaskWithOBS.java @@ -60,8 +60,7 @@ false, getBucketLayout(), OZONE_RECON_NSSUMMARY_FLUSH_TO_DB_MAX_THRESHOLD, OZONE_RECON_NSSUMMARY_FLUSH_TO_DB_MAX_THRESHOLD_DEFAULT); nSSummaryTaskWithOBS = new NSSummaryTaskWithOBS(getReconNamespaceSummaryManager(), - getReconOMMetadataManager(), - threshold); + getReconOMMetadataManager(), threshold, 5, 20, 2000); } /** diff --git a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestNSSummaryTreePrecomputeValues.java b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestNSSummaryTreePrecomputeValues.java index 3dd691110611..da5535c96ceb 100644 --- a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestNSSummaryTreePrecomputeValues.java +++ b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestNSSummaryTreePrecomputeValues.java @@ -112,7 +112,7 @@ void setUp(@TempDir File tmpDir) throws Exception { nSSummaryTaskWithFso = new NSSummaryTaskWithFSO( getReconNamespaceSummaryManager(), getReconOMMetadataManager(), - threshold); + threshold, 5, 20, 2000); // Populate a custom complex tree structure populateComplexTree(); diff --git a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestOmTableInsightTask.java b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestOmTableInsightTask.java index 4f64d27297b2..92ee52652107 100644 --- a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestOmTableInsightTask.java +++ b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestOmTableInsightTask.java @@ -154,7 +154,7 @@ private void initializeInjector() throws IOException { omTableInsightTask = new OmTableInsightTask( reconGlobalStatsManager, reconOMMetadataManager); nSSummaryTaskWithFso = new NSSummaryTaskWithFSO( - reconNamespaceSummaryManager, reconOMMetadataManager, 10); + reconNamespaceSummaryManager, reconOMMetadataManager, 10, 5, 20, 2000); dslContext = getDslContext(); omTableInsightTask.setTables(omTableInsightTask.getTaskTables());