From cb5a5c8466a7a8eaa3f0e765e8af3cedd354b8e4 Mon Sep 17 00:00:00 2001 From: deardeng Date: Mon, 27 Oct 2025 18:54:57 +0800 Subject: [PATCH] [fix](tablet report)Replace tablet report with ForkJoinPool --- .../java/org/apache/doris/common/Config.java | 9 + .../doris/catalog/TabletInvertedIndex.java | 518 +++++++++++++----- 2 files changed, 375 insertions(+), 152 deletions(-) diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index a49d3b941e4419..6b75c1cbe5d4ff 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -562,6 +562,15 @@ public class Config extends ConfigBase { "Whether to enable parallel publish version"}) public static boolean enable_parallel_publish_version = true; + + @ConfField(masterOnly = true, description = {"Tablet report 线程池的数目", + "Num of thread to handle tablet report task"}) + public static int tablet_report_thread_pool_num = 10; + + @ConfField(masterOnly = true, description = {"Tablet report 线程池的队列大小", + "Queue size to store tablet report task in publish thread pool"}) + public static int tablet_report_queue_size = 1024; + @ConfField(mutable = true, masterOnly = true, description = {"提交事务的最大超时时间,单位是秒。" + "该参数仅用于事务型 insert 操作中。", "Maximal waiting time for all data inserted before one transaction to be committed, in seconds. " diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java index 1b37e7f3a6ca3f..d90a7861662269 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java @@ -56,7 +56,11 @@ import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.StampedLock; import java.util.stream.Collectors; @@ -104,7 +108,14 @@ public class TabletInvertedIndex { // Notice only none-cloud use it for be reporting tablets. This map is empty in cloud mode. private volatile ImmutableMap partitionCollectInfoMap = ImmutableMap.of(); - private ForkJoinPool taskPool = new ForkJoinPool(Runtime.getRuntime().availableProcessors()); + private final ExecutorService taskPool = new ThreadPoolExecutor( + Config.tablet_report_thread_pool_num, + 2 * Config.tablet_report_thread_pool_num, + // tablet report task default 60s once + 120L, + TimeUnit.SECONDS, + new LinkedBlockingQueue<>(Config.tablet_report_queue_size), + new ThreadPoolExecutor.DiscardOldestPolicy()); public TabletInvertedIndex() { } @@ -143,178 +154,381 @@ public void tabletReport(long backendId, Map backendTablets, long feTabletNum = 0; long stamp = readLock(); long start = System.currentTimeMillis(); + try { if (LOG.isDebugEnabled()) { LOG.debug("begin to do tablet diff with backend[{}]. num: {}", backendId, backendTablets.size()); } + Map replicaMetaWithBackend = backingReplicaMetaTable.row(backendId); if (replicaMetaWithBackend != null) { feTabletNum = replicaMetaWithBackend.size(); - taskPool.submit(() -> { - // traverse replicas in meta with this backend - replicaMetaWithBackend.entrySet().parallelStream().forEach(entry -> { - long tabletId = entry.getKey(); - Preconditions.checkState(tabletMetaMap.containsKey(tabletId), - "tablet " + tabletId + " not exists, backend " + backendId); - TabletMeta tabletMeta = tabletMetaMap.get(tabletId); - - if (backendTablets.containsKey(tabletId)) { - TTablet backendTablet = backendTablets.get(tabletId); - Replica replica = entry.getValue(); - tabletFoundInMeta.add(tabletId); - TTabletInfo backendTabletInfo = backendTablet.getTabletInfos().get(0); - TTabletMetaInfo tabletMetaInfo = null; - if (backendTabletInfo.getReplicaId() != replica.getId() - && replica.getState() != ReplicaState.CLONE) { - // Need to update replica id in BE - tabletMetaInfo = new TTabletMetaInfo(); - tabletMetaInfo.setReplicaId(replica.getId()); - } - PartitionCollectInfo partitionCollectInfo = - partitionCollectInfoMap.get(backendTabletInfo.getPartitionId()); - boolean isInMemory = partitionCollectInfo != null && partitionCollectInfo.isInMemory(); - if (isInMemory != backendTabletInfo.isIsInMemory()) { - if (tabletMetaInfo == null) { - tabletMetaInfo = new TTabletMetaInfo(); - tabletMetaInfo.setIsInMemory(isInMemory); - } - } - if (Config.fix_tablet_partition_id_eq_0 - && tabletMeta.getPartitionId() > 0 - && backendTabletInfo.getPartitionId() == 0) { - LOG.warn("be report tablet partition id not eq fe, in be {} but in fe {}", - backendTabletInfo, tabletMeta); - // Need to update partition id in BE - tabletMetaInfo = new TTabletMetaInfo(); - tabletMetaInfo.setPartitionId(tabletMeta.getPartitionId()); - } - // 1. (intersection) - if (needSync(replica, backendTabletInfo)) { - // need sync - synchronized (tabletSyncMap) { - tabletSyncMap.put(tabletMeta.getDbId(), tabletId); - } - } + processTabletReportAsync(backendId, backendTablets, backendPartitionsVersion, storageMediumMap, + tabletSyncMap, tabletDeleteFromMeta, tabletFoundInMeta, tabletMigrationMap, + partitionVersionSyncMap, transactionsToPublish, transactionsToClear, tabletRecoveryMap, + tabletToUpdate, cooldownTablets, replicaMetaWithBackend); + } + } finally { + readUnlock(stamp); + } - // check and set path - // path info of replica is only saved in Master FE - if (backendTabletInfo.isSetPathHash() - && replica.getPathHash() != backendTabletInfo.getPathHash()) { - replica.setPathHash(backendTabletInfo.getPathHash()); - } + // Process cooldown configs outside of read lock to avoid deadlock + cooldownTablets.forEach(p -> handleCooldownConf(p.first, p.second, cooldownConfToPush, cooldownConfToUpdate)); - if (backendTabletInfo.isSetSchemaHash() && replica.getState() == ReplicaState.NORMAL - && replica.getSchemaHash() != backendTabletInfo.getSchemaHash()) { - // update the schema hash only when replica is normal - replica.setSchemaHash(backendTabletInfo.getSchemaHash()); - } + logTabletReportSummary(backendId, feTabletNum, backendTablets, backendPartitionsVersion, + tabletSyncMap, tabletDeleteFromMeta, tabletFoundInMeta, tabletMigrationMap, + partitionVersionSyncMap, transactionsToPublish, transactionsToClear, + tabletToUpdate, tabletRecoveryMap, start); + } - if (needRecover(replica, tabletMeta.getOldSchemaHash(), backendTabletInfo)) { - LOG.warn("replica {} of tablet {} on backend {} need recovery. " - + "replica in FE: {}, report version {}, report schema hash: {}," - + " is bad: {}, is version missing: {}", - replica.getId(), tabletId, backendId, replica, - backendTabletInfo.getVersion(), - backendTabletInfo.getSchemaHash(), - backendTabletInfo.isSetUsed() ? !backendTabletInfo.isUsed() : "false", - backendTabletInfo.isSetVersionMiss() ? backendTabletInfo.isVersionMiss() : - "unset"); - synchronized (tabletRecoveryMap) { - tabletRecoveryMap.put(tabletMeta.getDbId(), tabletId); - } - } + /** + * Process tablet report asynchronously using thread pool + */ + private void processTabletReportAsync(long backendId, Map backendTablets, + Map backendPartitionsVersion, + HashMap storageMediumMap, + ListMultimap tabletSyncMap, + ListMultimap tabletDeleteFromMeta, + Set tabletFoundInMeta, + ListMultimap tabletMigrationMap, + Map partitionVersionSyncMap, + Map> transactionsToPublish, + SetMultimap transactionsToClear, + ListMultimap tabletRecoveryMap, + List tabletToUpdate, + List> cooldownTablets, + Map replicaMetaWithBackend) { + // Calculate optimal chunk size to balance task granularity and concurrency + // For large tablet counts (40W-50W), we want smaller chunks to maximize parallelism + // Target: create at least threadPoolSize * 4 tasks for better load balancing + int totalTablets = replicaMetaWithBackend.size(); + int threadPoolSize = Config.tablet_report_thread_pool_num; + int targetTasks = threadPoolSize * 4; // Create 4x tasks as threads for better load balancing + int chunkSize = Math.max(500, totalTablets / targetTasks); + + // Cap chunk size to avoid too large tasks + // so thread pool queue will not be fulled with few large tasks + int maxChunkSize = 10000; + chunkSize = Math.min(chunkSize, maxChunkSize); + List> entries = new ArrayList<>(replicaMetaWithBackend.entrySet()); + List> tabletFutures = new ArrayList<>(); + int estimatedTasks = (totalTablets + chunkSize - 1) / chunkSize; + if (LOG.isDebugEnabled()) { + LOG.debug("Processing tablet report for backend[{}]: total tablets={}, chunkSize={}, estimated tasks={}", + backendId, totalTablets, chunkSize, estimatedTasks); + } - if (Config.enable_storage_policy && backendTabletInfo.isSetCooldownTerm()) { - // Place tablet info in a container and process it outside of read lock to avoid - // deadlock with OlapTable lock - synchronized (cooldownTablets) { - cooldownTablets.add(Pair.of(tabletMeta, backendTabletInfo)); - } - replica.setCooldownMetaId(backendTabletInfo.getCooldownMetaId()); - replica.setCooldownTerm(backendTabletInfo.getCooldownTerm()); - } + for (int i = 0; i < entries.size(); i += chunkSize) { + final int start = i; + final int end = Math.min(i + chunkSize, entries.size()); + + CompletableFuture future = CompletableFuture.runAsync(() -> { + for (int j = start; j < end; j++) { + Map.Entry entry = entries.get(j); + processTabletEntry(backendId, backendTablets, storageMediumMap, tabletSyncMap, + tabletDeleteFromMeta, tabletFoundInMeta, tabletMigrationMap, + transactionsToPublish, transactionsToClear, tabletRecoveryMap, + tabletToUpdate, cooldownTablets, entry); + } + }, taskPool); - long partitionId = tabletMeta.getPartitionId(); - if (!Config.disable_storage_medium_check) { - // check if need migration - TStorageMedium storageMedium = storageMediumMap.get(partitionId); - if (storageMedium != null && backendTabletInfo.isSetStorageMedium() - && isLocal(storageMedium) && isLocal(backendTabletInfo.getStorageMedium()) - && isLocal(tabletMeta.getStorageMedium())) { - if (storageMedium != backendTabletInfo.getStorageMedium()) { - synchronized (tabletMigrationMap) { - tabletMigrationMap.put(storageMedium, tabletId); - } - } - if (storageMedium != tabletMeta.getStorageMedium()) { - tabletMeta.setStorageMedium(storageMedium); - } - } - } + tabletFutures.add(future); + } - // check if should clear transactions - if (backendTabletInfo.isSetTransactionIds()) { - handleBackendTransactions(backendId, backendTabletInfo.getTransactionIds(), tabletId, - tabletMeta, transactionsToPublish, transactionsToClear); - } // end for txn id - - // update replicase's version count - // no need to write log, and no need to get db lock. - if (backendTabletInfo.isSetTotalVersionCount()) { - replica.setTotalVersionCount(backendTabletInfo.getTotalVersionCount()); - replica.setVisibleVersionCount(backendTabletInfo.isSetVisibleVersionCount() - ? backendTabletInfo.getVisibleVersionCount() - : backendTabletInfo.getTotalVersionCount()); - } - if (tabletMetaInfo != null) { - tabletMetaInfo.setTabletId(tabletId); - synchronized (tabletToUpdate) { - tabletToUpdate.add(tabletMetaInfo); - } - } - } else { - // 2. (meta - be) - // may need delete from meta - if (LOG.isDebugEnabled()) { - LOG.debug("backend[{}] does not report tablet[{}-{}]", backendId, tabletId, tabletMeta); - } - synchronized (tabletDeleteFromMeta) { - tabletDeleteFromMeta.put(tabletMeta.getDbId(), tabletId); - } - } - }); - - backendPartitionsVersion.entrySet().parallelStream().forEach(entry -> { - long partitionId = entry.getKey(); - long backendVersion = entry.getValue(); - PartitionCollectInfo partitionInfo = partitionCollectInfoMap.get(partitionId); - if (partitionInfo != null && partitionInfo.getVisibleVersion() > backendVersion) { - partitionVersionSyncMap.put(partitionId, partitionInfo.getVisibleVersion()); - } - }); - }).join(); + // Process partition versions in parallel + CompletableFuture partitionFuture = CompletableFuture.runAsync(() -> { + processPartitionVersions(backendPartitionsVersion, partitionVersionSyncMap); + }, taskPool); + + // Wait for all tasks to complete + CompletableFuture.allOf(tabletFutures.toArray(new CompletableFuture[0])).join(); + partitionFuture.join(); + } + + /** + * Process a single tablet entry from backend report + */ + private void processTabletEntry(long backendId, Map backendTablets, + HashMap storageMediumMap, + ListMultimap tabletSyncMap, + ListMultimap tabletDeleteFromMeta, + Set tabletFoundInMeta, + ListMultimap tabletMigrationMap, + Map> transactionsToPublish, + SetMultimap transactionsToClear, + ListMultimap tabletRecoveryMap, + List tabletToUpdate, + List> cooldownTablets, + Map.Entry entry) { + long tabletId = entry.getKey(); + Replica replica = entry.getValue(); + + Preconditions.checkState(tabletMetaMap.containsKey(tabletId), + "tablet " + tabletId + " not exists, backend " + backendId); + TabletMeta tabletMeta = tabletMetaMap.get(tabletId); + + if (backendTablets.containsKey(tabletId)) { + // Tablet exists in both FE and BE + TTablet backendTablet = backendTablets.get(tabletId); + TTabletInfo backendTabletInfo = backendTablet.getTabletInfos().get(0); + + tabletFoundInMeta.add(tabletId); + + processExistingTablet(backendId, tabletId, replica, tabletMeta, backendTabletInfo, + storageMediumMap, tabletSyncMap, tabletMigrationMap, transactionsToPublish, + transactionsToClear, tabletRecoveryMap, tabletToUpdate, cooldownTablets); + } else { + // Tablet exists in FE but not in BE - may need deletion + processDeletedTablet(backendId, tabletId, tabletMeta, tabletDeleteFromMeta); + } + } + + /** + * Process tablet that exists in both FE and BE + */ + private void processExistingTablet(long backendId, long tabletId, Replica replica, + TabletMeta tabletMeta, TTabletInfo backendTabletInfo, + HashMap storageMediumMap, + ListMultimap tabletSyncMap, + ListMultimap tabletMigrationMap, + Map> transactionsToPublish, + SetMultimap transactionsToClear, + ListMultimap tabletRecoveryMap, + List tabletToUpdate, + List> cooldownTablets) { + // Check and prepare tablet meta info update + TTabletMetaInfo tabletMetaInfo = prepareTabletMetaInfo(replica, tabletMeta, backendTabletInfo); + + // Check if version sync is needed + if (needSync(replica, backendTabletInfo)) { + synchronized (tabletSyncMap) { + tabletSyncMap.put(tabletMeta.getDbId(), tabletId); } - } finally { - readUnlock(stamp); } - cooldownTablets.forEach(p -> handleCooldownConf(p.first, p.second, cooldownConfToPush, cooldownConfToUpdate)); - long end = System.currentTimeMillis(); + // Update replica path and schema hash + updateReplicaBasicInfo(replica, backendTabletInfo); + + // Check if replica needs recovery + if (needRecover(replica, tabletMeta.getOldSchemaHash(), backendTabletInfo)) { + logReplicaRecovery(replica, tabletId, backendId, backendTabletInfo); + synchronized (tabletRecoveryMap) { + tabletRecoveryMap.put(tabletMeta.getDbId(), tabletId); + } + } + + // Handle cooldown policy + if (Config.enable_storage_policy && backendTabletInfo.isSetCooldownTerm()) { + synchronized (cooldownTablets) { + cooldownTablets.add(Pair.of(tabletMeta, backendTabletInfo)); + } + replica.setCooldownMetaId(backendTabletInfo.getCooldownMetaId()); + replica.setCooldownTerm(backendTabletInfo.getCooldownTerm()); + } + + // Check storage medium migration + checkStorageMediumMigration(tabletId, tabletMeta, backendTabletInfo, + storageMediumMap, tabletMigrationMap); + + // Handle transactions + if (backendTabletInfo.isSetTransactionIds()) { + handleBackendTransactions(backendId, backendTabletInfo.getTransactionIds(), tabletId, + tabletMeta, transactionsToPublish, transactionsToClear); + } + + // Update replica version count + updateReplicaVersionCount(replica, backendTabletInfo); + + // Add tablet meta info to update list if needed + if (tabletMetaInfo != null) { + tabletMetaInfo.setTabletId(tabletId); + synchronized (tabletToUpdate) { + tabletToUpdate.add(tabletMetaInfo); + } + } + } + + /** + * Prepare tablet meta info for BE update if needed + */ + private TTabletMetaInfo prepareTabletMetaInfo(Replica replica, TabletMeta tabletMeta, + TTabletInfo backendTabletInfo) { + TTabletMetaInfo tabletMetaInfo = null; + + // Check replica id mismatch + if (backendTabletInfo.getReplicaId() != replica.getId() + && replica.getState() != ReplicaState.CLONE) { + tabletMetaInfo = new TTabletMetaInfo(); + tabletMetaInfo.setReplicaId(replica.getId()); + } + + // Check in-memory flag + PartitionCollectInfo partitionCollectInfo = + partitionCollectInfoMap.get(backendTabletInfo.getPartitionId()); + boolean isInMemory = partitionCollectInfo != null && partitionCollectInfo.isInMemory(); + if (isInMemory != backendTabletInfo.isIsInMemory()) { + if (tabletMetaInfo == null) { + tabletMetaInfo = new TTabletMetaInfo(); + } + tabletMetaInfo.setIsInMemory(isInMemory); + } + + // Check partition id mismatch + if (Config.fix_tablet_partition_id_eq_0 + && tabletMeta.getPartitionId() > 0 + && backendTabletInfo.getPartitionId() == 0) { + LOG.warn("be report tablet partition id not eq fe, in be {} but in fe {}", + backendTabletInfo, tabletMeta); + if (tabletMetaInfo == null) { + tabletMetaInfo = new TTabletMetaInfo(); + } + tabletMetaInfo.setPartitionId(tabletMeta.getPartitionId()); + } + + return tabletMetaInfo; + } + + /** + * Update replica's basic info like path hash and schema hash + */ + private void updateReplicaBasicInfo(Replica replica, TTabletInfo backendTabletInfo) { + // Update path hash + if (backendTabletInfo.isSetPathHash() + && replica.getPathHash() != backendTabletInfo.getPathHash()) { + replica.setPathHash(backendTabletInfo.getPathHash()); + } + + // Update schema hash + if (backendTabletInfo.isSetSchemaHash() + && replica.getState() == ReplicaState.NORMAL + && replica.getSchemaHash() != backendTabletInfo.getSchemaHash()) { + replica.setSchemaHash(backendTabletInfo.getSchemaHash()); + } + } + + /** + * Log replica recovery information + */ + private void logReplicaRecovery(Replica replica, long tabletId, long backendId, + TTabletInfo backendTabletInfo) { + LOG.warn("replica {} of tablet {} on backend {} need recovery. " + + "replica in FE: {}, report version {}, report schema hash: {}, " + + "is bad: {}, is version missing: {}", + replica.getId(), tabletId, backendId, replica, + backendTabletInfo.getVersion(), + backendTabletInfo.getSchemaHash(), + backendTabletInfo.isSetUsed() ? !backendTabletInfo.isUsed() : "false", + backendTabletInfo.isSetVersionMiss() ? backendTabletInfo.isVersionMiss() : "unset"); + } + + /** + * Check if storage medium migration is needed + */ + private void checkStorageMediumMigration(long tabletId, TabletMeta tabletMeta, + TTabletInfo backendTabletInfo, + HashMap storageMediumMap, + ListMultimap tabletMigrationMap) { + if (Config.disable_storage_medium_check) { + return; + } + + long partitionId = tabletMeta.getPartitionId(); + TStorageMedium storageMedium = storageMediumMap.get(partitionId); + + if (storageMedium != null && backendTabletInfo.isSetStorageMedium() + && isLocal(storageMedium) + && isLocal(backendTabletInfo.getStorageMedium()) + && isLocal(tabletMeta.getStorageMedium())) { + + if (storageMedium != backendTabletInfo.getStorageMedium()) { + synchronized (tabletMigrationMap) { + tabletMigrationMap.put(storageMedium, tabletId); + } + } + + if (storageMedium != tabletMeta.getStorageMedium()) { + tabletMeta.setStorageMedium(storageMedium); + } + } + } + + /** + * Update replica's version count + */ + private void updateReplicaVersionCount(Replica replica, TTabletInfo backendTabletInfo) { + if (backendTabletInfo.isSetTotalVersionCount()) { + replica.setTotalVersionCount(backendTabletInfo.getTotalVersionCount()); + replica.setVisibleVersionCount(backendTabletInfo.isSetVisibleVersionCount() + ? backendTabletInfo.getVisibleVersionCount() + : backendTabletInfo.getTotalVersionCount()); + } + } + + /** + * Process tablet that exists in FE but not reported by BE + */ + private void processDeletedTablet(long backendId, long tabletId, TabletMeta tabletMeta, + ListMultimap tabletDeleteFromMeta) { + if (LOG.isDebugEnabled()) { + LOG.debug("backend[{}] does not report tablet[{}-{}]", backendId, tabletId, tabletMeta); + } + synchronized (tabletDeleteFromMeta) { + tabletDeleteFromMeta.put(tabletMeta.getDbId(), tabletId); + } + } + + /** + * Process partition versions reported by BE + */ + private void processPartitionVersions(Map backendPartitionsVersion, + Map partitionVersionSyncMap) { + for (Map.Entry entry : backendPartitionsVersion.entrySet()) { + long partitionId = entry.getKey(); + long backendVersion = entry.getValue(); + PartitionCollectInfo partitionInfo = partitionCollectInfoMap.get(partitionId); + + if (partitionInfo != null && partitionInfo.getVisibleVersion() > backendVersion) { + partitionVersionSyncMap.put(partitionId, partitionInfo.getVisibleVersion()); + } + } + } + + /** + * Log tablet report summary + */ + private void logTabletReportSummary(long backendId, long feTabletNum, + Map backendTablets, + Map backendPartitionsVersion, + ListMultimap tabletSyncMap, + ListMultimap tabletDeleteFromMeta, + Set tabletFoundInMeta, + ListMultimap tabletMigrationMap, + Map partitionVersionSyncMap, + Map> transactionsToPublish, + SetMultimap transactionsToClear, + List tabletToUpdate, + ListMultimap tabletRecoveryMap, + long startTime) { + long endTime = System.currentTimeMillis(); long toClearTransactionsNum = transactionsToClear.keySet().size(); long toClearTransactionsPartitions = transactionsToClear.values().size(); long toPublishTransactionsNum = transactionsToPublish.values().stream() - .mapToLong(m -> m.keySet().size()).sum(); + .mapToLong(m -> m.keySet().size()).sum(); long toPublishTransactionsPartitions = transactionsToPublish.values().stream() - .mapToLong(m -> m.values().size()).sum(); - LOG.info("finished to do tablet diff with backend[{}]. fe tablet num: {}, backend tablet num: {}. sync: {}." - + " metaDel: {}. foundInMeta: {}. migration: {}. backend partition num: {}, backend need " - + "update: {}. found invalid transactions {}(partitions: {}). found republish " - + "transactions {}(partitions: {}). tabletToUpdate: {}. need recovery: {}. cost: {} ms", + .mapToLong(m -> m.values().size()).sum(); + + LOG.info("finished to do tablet diff with backend[{}]. fe tablet num: {}, backend tablet num: {}. " + + "sync: {}, metaDel: {}, foundInMeta: {}, migration: {}, " + + "backend partition num: {}, backend need update: {}, " + + "found invalid transactions {}(partitions: {}), " + + "found republish transactions {}(partitions: {}), " + + "tabletToUpdate: {}, need recovery: {}, cost: {} ms", backendId, feTabletNum, backendTablets.size(), tabletSyncMap.size(), tabletDeleteFromMeta.size(), tabletFoundInMeta.size(), tabletMigrationMap.size(), - backendPartitionsVersion.size(), partitionVersionSyncMap.size(), toClearTransactionsNum, - toClearTransactionsPartitions, toPublishTransactionsNum, toPublishTransactionsPartitions, - tabletToUpdate.size(), tabletRecoveryMap.size(), (end - start)); + backendPartitionsVersion.size(), partitionVersionSyncMap.size(), + toClearTransactionsNum, toClearTransactionsPartitions, + toPublishTransactionsNum, toPublishTransactionsPartitions, + tabletToUpdate.size(), tabletRecoveryMap.size(), (endTime - startTime)); } private void handleBackendTransactions(long backendId, List transactionIds, long tabletId,