Skip to content

Conversation

@ArafatKhan2198
Copy link
Contributor

@ArafatKhan2198 ArafatKhan2198 commented Nov 4, 2025

Summary

Implemented parallel table iteration in Recon snapshot reprocessing to significantly improve performance. Instead of reading tables sequentially with a single iterator, this change uses multiple iterator threads to read different RocksDB SST file segments concurrently, with worker threads processing batches in parallel.

What Changed

  • ContainerKeyMapperHelper - Parallel iteration for FSO and OBS tables
  • FileSizeCountTaskHelper - Parallel iteration for FSO and OBS tables
  • OmTableInsightTask - Parallel counting for simple tables, sequential with keyIterator() for tables with non-String keys (dTokenTable, s3SecretTable)

Key Implementation Details

  • 2-Tier Thread Pool: 5 iterator threads read from different SST file segments based on LiveFileMetaData boundaries, creating batches. 20 worker threads (or 2 for simple counting) process these batches.
  • Lockless Design: Each worker gets its own map via ConcurrentHashMap.computeIfAbsent(Thread.currentThread().getId()) - no read/write locks needed for map updates
  • Flush Coordination: Workers flush independently when their map reaches threshold (200k / 20 = 10k per worker). Only synchronize on DB write operation.
  • Cross-Task Synchronization: ContainerKeyMapperTask uses shared ConcurrentHashMap<Long, AtomicLong> because FSO and OBS tasks write to the same container IDs. FileSizeCountTask doesn't need shared state (FSO and OBS write different bucket keys).
  • Configuration: Added 4 tunable config keys with defaults optimized for common workloads:
    • ozone.recon.task.reprocess.max.iterators = 5
    • ozone.recon.task.reprocess.max.workers = 20
    • ozone.recon.task.reprocess.max.keys.in.memory = 2000
    • ozone.recon.filesizecount.flush.db.max.threshold = 200000

What is the link to the Apache JIRA

https://issues.apache.org/jira/browse/HDDS-12607

How was this patch tested?

Performance Results

Tested on OM RocksDB with 104M keys and 5.4M directories:

Task Keys Processed Duration Throughput
ContainerKeyMapperTaskFSO 104,408,884 1,400 s ≈ 74,578 keys/sec
FileSizeCountTaskFSO 104,408,884 205.4 s ≈ 508,300 keys/sec
OmTableInsightTask All the tables 69.3 s N/A
FileSizeCountTaskOBS 5,437,734 14.7 s ≈ 369,900 keys/sec
ContainerKeyMapperTaskOBS 5,437,734 59.8 s ≈ 90,900 keys/sec

@sumitagrawl
Copy link
Contributor

share some performance data as seen improved due to parallel reprocess

Copy link
Contributor

@sumitagrawl sumitagrawl left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ArafatKhan2198 Due to recent features and old issue, data is not updated with synchronized way. Might be we can change model,
Producer-- (table iterator with decode) ==> consumer (single thread that will update NSSummary) instead of having multiple times lock take / release and avoid starvation.

Copy link
Contributor

@devmadhuu devmadhuu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @ArafatKhan2198 for the patch. Given some comments, Also the PR description and objective mentioned is confusing.

We are not parallelizing the recon tasks, rather we are improving all recon tasks's full snapshot reprocessing where we are processing all sst files in the respective rocksDB table needed for that task in concurrent fashion rather iterating the respective table's DB keys in sequential fashion. Pls correct the PR description to avoid any confusion. Recon tasks as such in itself are already being running in parallel before this PR change.

Copy link
Contributor

@devmadhuu devmadhuu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ArafatKhan2198 , Pls check , few more comments.

this.maxIteratorTasks = 2 * iteratorCount;
this.maxWorkerTasks = workerCount * 2;
this.iteratorExecutor = new ThreadPoolExecutor(iteratorCount, iteratorCount, 1, TimeUnit.MINUTES,
new ArrayBlockingQueue<>(iteratorCount * 2));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This has a potential issue where if queue fills up, there is no RejectionPolicy defined and RejectedExecutionException lead Recon to crash

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implemented with CallerRunsPolicy. Added rejection policy to both thread pools:


try (ParallelTableIteratorOperation<String, OmKeyInfo> keyIter =
new ParallelTableIteratorOperation<>(omMetadataManager, omKeyInfoTable,
StringCodec.get(), maxIterators, maxWorkers, maxKeysInMemory, 100000)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where the flush logic gone ? Why removed ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Had removed it by mistake, will add it back

reconContainerMetadataManager)) {
LOG.error("Failed to flush container key data for {}", taskName);
return false;
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use fair locking, else thread starvation issue can happen.

  • ReentrantReadWriteLock is non-fair by default
  • With 20 worker threads continuously acquiring read locks, write lock requests can be starved indefinitely
  • When containerKeyMap reaches threshold, threads need write lock to flush, but may wait forever

Scenario:
Thread 1: [Read Lock] -> processing key 1
Thread 2: [Read Lock] -> processing key 2
Thread 3: [Read Lock] -> processing key 3
...
Thread 20: [Read Lock] -> processing key 20
Thread 21: [Waiting Write Lock] <- BLOCKED! (containerKeyMap full, needs to flush)
Thread 1: [Read Lock] -> processing key 21 (lock released and reacquired)
Thread 2: [Read Lock] -> processing key 22
... Thread 21 STILL WAITING

Copy link
Contributor Author

@ArafatKhan2198 ArafatKhan2198 Nov 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Evaluated but decided against due to performance impact. I initially implemented fair locking (new ReentrantReadWriteLock(true)), but observed 60%+ performance degradation in testing (throughput dropped from ~60K keys/sec to ~7K keys/sec).

Alternative mitigation implemented:

AtomicBoolean isFlushingInProgress = new AtomicBoolean(false);

if (map.size() >= threshold && isFlushingInProgress.compareAndSet(false, true)) {

   // Only ONE thread attempts flush

}

This flag-based coordination prevents the mass queueing scenario while maintaining performance. In our 8.7M key workload, flushing happens ~58 times total (every 150K keys), making starvation statistically unlikely. Open to revisiting if starvation is observed in production.

@ArafatKhan2198
Copy link
Contributor Author

ArafatKhan2198 commented Nov 10, 2025

Thank you for the thorough review! @devmadhuu
Please do a re-review and let me know if any more changes.

@ArafatKhan2198
Copy link
Contributor Author

ArafatKhan2198 commented Nov 10, 2025

I tested on an OM DB with 8.7 million keys and 5.6 million directories, totaling about 1.2 TB of data.

Task Bucket/Layout Items Duration (s) Throughput (/s) Notes
ContainerKeyMapper FSO 8,749,130 keys 215.22 40,651  
FileSizeCount FSO 8,749,130 keys 40.72 214,887  
ContainerKeyMapper OBS 680,167 keys 20.81 32,685  
FileSizeCount OBS 680,167 keys 4.26 159,814  
OmTableInsightTask (handlers) 22 tables 164.68 3 parallelized, 19 sequential

I have also attached the recon logs for verification - recon.log

Copy link
Contributor

@devmadhuu devmadhuu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ArafatKhan2198 thanks for improving the patch. Pls see some more comments.

try {
lock.writeLock().lock();
try {
if (!checkAndCallFlushToDB(containerKeyMap, containerKeyFlushToDBMaxThreshold,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is triple locking here which increases contention and not needed. checkAndCallFlushToDB method also using static synchronization.

  • write lock → static monitor → RocksDB locks
  • Increases contention
  • Increases latency
  • Increases likelihood of deadlock

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I have made the change.

// Double-check after acquiring write lock
if (fileSizeCountMap.size() >= FLUSH_THRESHOLD) {
LOG.debug("Flushing {} accumulated counts to RocksDB for {}", fileSizeCountMap.size(), taskName);
writeCountsToDB(fileSizeCountMap, reconFileMetadataManager);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now this is being called by multiple threads in hot path and in this method, for every key, we are doing read DB operation, this could be performance issue,

Long existingCount = reconFileMetadataManager.getFileSizeCount(key);

It is O(N) DB reads instead of O(1) batch operations

Copy link
Contributor

@devmadhuu devmadhuu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @ArafatKhan2198 for improving the patch, Need to properly check the error handling behavior and few more comments. Please check carefully.

Please add some test coverage:

  1. Concurrent FSO + OBS task execution
  2. Thread pool exhaustion

Below can be done in separate PR, you can mention the JIRA for this comment here.
Add metrics for parallel processing
- Worker thread utilization
- Lock wait times
- Queue depths

Copy link
Contributor

@sumitagrawl sumitagrawl left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@swamirishi there are few comments over parallel iterator correctness when filter is present, please check
cc: @ArafatKhan2198

}
if (!keyValues.isEmpty()) {
waitForQueueSize(workerFutures, maxWorkerTasks - 10);
workerFutures.add(valueExecutors.submit(() -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

workerFutures will wait for the batch being queued. May have some delay in lesser number of splits. Otherway is,
Blocking-workerQueue --
Producer (multi-threaded, ie iterator-executor): kv is added as soon identified
consumer: Set of thread waiting on queue to retrive and process
End point: set flag when iterator is completed

This might be simpler implementation to avoid intermediate wait.


public static final AtomicBoolean CONTAINER_KEY_TABLES_TRUNCATED = new AtomicBoolean(false);

public static final AtomicBoolean CONTAINER_KEY_COUNT_MAP_INITIALIZED = new AtomicBoolean(false);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need this Atomic boolean.

* cross-task data corruption where FSO and OBS tasks overwrite each other's counts.
*/
private static void initializeSharedContainerCountMapIfNeeded(String taskName) {
synchronized (SHARED_MAP_LOCK) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

init of map and clear can be done using truncate_lock, rename this lock.

try {
// No deleted container list needed since "reprocess" only has put operations
writeToTheDB(containerKeyMap, containerKeyCountMap, Collections.emptyList(), reconContainerMetadataManager);
writeToTheDB(localContainerKeyMap, sharedContainerKeyCountMap, Collections.emptyList(), reconContainerMetadataManager);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

only last task getting executed need flush and cleanup map, else data can got wrong.

writeCountsToDB(fileSizeCountMap, reconFileMetadataManager);
long writeStartTime = Time.monotonicNow();
// Acquire GLOBAL lock (cross-task) before writing to DB
FILE_COUNT_WRITE_LOCK.writeLock().lock();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FILE_COUNT_LOCK needs to be local lock

* Process table in parallel using multiple iterators and workers.
* Only for tables with String keys.
*/
private void processTableInParallel(String tableName, Table<?, ?> table,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need pass table here, get table with StringCodec as done in try-catch and use same if required other place

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implemented. Removed table parameter and get table inside method with StringCodec as suggested. Method now takes only tableName and omMetadataManager.

* Calculate logging threshold based on table size.
* Logs progress every 1% of total keys, minimum 1.
*/
private long calculateLoggingThreshold(Table<?, ?> table) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can pass estimatedCount directly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implemented. Changed method to take long estimatedCount instead of Table.

* Process table sequentially using raw iterator (no type assumptions).
* Used for tables with non-String keys or as fallback.
*/
private void processTableSequentially(String tableName, Table<?, ?> table) throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can get table inside only with keyIterator(), no need value iterator.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implemented. Changed to use table.keyIterator() instead of full iterator.

replicatedSizeMap.put(getReplicatedSizeKeyFromTable(tableName),
details.getRight());
Table<String, ?> stringTable = (Table<String, ?>) table;
try (TableIterator<String, ? extends Table.KeyValue<String, ?>> iterator = stringTable.iterator()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can remove table object and getTableSizeAndCount can get table object and return count

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implemented. Modified OmTableHandler interface and both implementations (OpenKeysInsightHandler, DeletedKeysInsightHandler) to take tableName and omMetadataManager instead of iterator. Handlers now get table and create iterators themselves.

if (myMap.size() >= PER_WORKER_THRESHOLD) {
synchronized (flushLock) {
writeCountsToDB(myMap, reconFileMetadataManager);
myMap.clear();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename myMap variable as fileSizeCountMap

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implemented. Renamed myMap to workerFileSizeCountMap for clarity

SHARED_CONTAINER_KEY_COUNT_MAP.clear();

// Step 3: Initialize reference counter (2 tasks: FSO + OBS)
ACTIVE_TASK_COUNT.set(2);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need increment in synchronized_lock by one every task, and decrement in same lock when finish

Copy link
Contributor Author

@ArafatKhan2198 ArafatKhan2198 Dec 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implemented

Function<Table.KeyValue<String, OmKeyInfo>, Void> kvOperation = kv -> {
try {
// Get or create this worker's private local map using thread ID
Map<ContainerKeyPrefix, Integer> myLocalMap = allLocalMaps.computeIfAbsent(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename myLocalMap to containerKeyPrefixMap

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed myLocalMap to containerKeyPrefixMap for better clarity.

if (!checkAndCallFlushToDB(containerKeyMap, containerKeyFlushToDBMaxThreshold,
// Flush this worker's map when it reaches threshold
if (myLocalMap.size() >= PER_WORKER_THRESHOLD) {
synchronized (flushLock) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we do not need lock as containerKeyPrefix is unique, added to table.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks I have now Removed flushLock from periodic flush as each worker flushes unique ContainerKeyPrefix values (containerId + keyPrefix + keyVersion combinations).

long totalContainers = SHARED_CONTAINER_KEY_COUNT_MAP.size();

// Final flush: Shared container count map
if (!flushAndCommitContainerKeyInfoToDB(Collections.emptyMap(), SHARED_CONTAINER_KEY_COUNT_MAP, reconContainerMetadataManager)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

flush to be done at last, else can have twice flush.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implemented. Moved shared container count map flush inside the if (remainingTasks == 0) block. Now only the last completing task flushes the shared map once, preventing duplicate flushes.

Copy link
Contributor

@sumitagrawl sumitagrawl left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given few comment

Copy link
Contributor

@sumitagrawl sumitagrawl left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@ArafatKhan2198
Copy link
Contributor Author

@ArafatKhan2198 ArafatKhan2198 marked this pull request as ready for review December 10, 2025 07:04
@ArafatKhan2198 ArafatKhan2198 merged commit ad891ec into apache:master Dec 10, 2025
56 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants