diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java index 22ca57f2399cfe..a1861fb7f4d023 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java @@ -91,6 +91,13 @@ private void publishVersion(Map partitionVisibleVersions, Map readyTransactionStates, + List allBackends) { long createPublishVersionTaskTime = System.currentTimeMillis(); // every backend-transaction identified a single task AgentBatchTask batchTask = new AgentBatchTask(); @@ -153,60 +160,77 @@ private void publishVersion(Map partitionVisibleVersions, Map tableIdToTotalDeltaNumRows = Maps.newHashMap(); + private void tryFinishTxn(List readyTransactionStates, + SystemInfoService infoService, GlobalTransactionMgr globalTransactionMgr, + Map partitionVisibleVersions, Map> backendPartitions) { // try to finish the transaction, if failed just retry in next loop for (TransactionState transactionState : readyTransactionStates) { - Stream publishVersionTaskStream = transactionState - .getPublishVersionTasks() - .values() - .stream() - .peek(task -> { - if (task.isFinished() && CollectionUtils.isEmpty(task.getErrorTablets())) { - Map tableIdToDeltaNumRows = - task.getTableIdToDeltaNumRows(); - tableIdToDeltaNumRows.forEach((tableId, numRows) -> { - tableIdToTotalDeltaNumRows - .computeIfPresent(tableId, (id, orgNumRows) -> orgNumRows + numRows); - tableIdToTotalDeltaNumRows.putIfAbsent(tableId, numRows); - }); - } - }); - boolean hasBackendAliveAndUnfinishedTask = publishVersionTaskStream - .anyMatch(task -> !task.isFinished() && infoService.checkBackendAlive(task.getBackendId())); - transactionState.setTableIdToTotalNumDeltaRows(tableIdToTotalDeltaNumRows); + try { + // try to finish the transaction, if failed just retry in next loop + tryFinishOneTxn(transactionState, infoService, globalTransactionMgr, partitionVisibleVersions, + backendPartitions); + } catch (Throwable t) { + LOG.error("errors while finish transaction: {}, publish tasks: {}", transactionState, + transactionState.getPublishVersionTasks(), t); + } + } // end for readyTransactionStates + } - boolean shouldFinishTxn = !hasBackendAliveAndUnfinishedTask || transactionState.isPublishTimeout() - || DebugPointUtil.isEnable("PublishVersionDaemon.not_wait_unfinished_tasks"); - if (shouldFinishTxn) { - try { - // one transaction exception should not affect other transaction - globalTransactionMgr.finishTransaction(transactionState.getDbId(), - transactionState.getTransactionId(), partitionVisibleVersions, backendPartitions); - } catch (Exception e) { - LOG.warn("error happens when finish transaction {}", transactionState.getTransactionId(), e); - } - if (transactionState.getTransactionStatus() != TransactionStatus.VISIBLE) { - // if finish transaction state failed, then update publish version time, should check - // to finish after some interval - transactionState.updateSendTaskTime(); - if (LOG.isDebugEnabled()) { - LOG.debug("publish version for transaction {} failed", transactionState); + private void tryFinishOneTxn(TransactionState transactionState, + SystemInfoService infoService, GlobalTransactionMgr globalTransactionMgr, + Map partitionVisibleVersions, Map> backendPartitions) { + Map tableIdToTotalDeltaNumRows = Maps.newHashMap(); + Stream publishVersionTaskStream = transactionState + .getPublishVersionTasks() + .values() + .stream() + .peek(task -> { + if (task.isFinished() && CollectionUtils.isEmpty(task.getErrorTablets())) { + Map tableIdToDeltaNumRows = + task.getTableIdToDeltaNumRows(); + tableIdToDeltaNumRows.forEach((tableId, numRows) -> { + tableIdToTotalDeltaNumRows + .computeIfPresent(tableId, (id, orgNumRows) -> orgNumRows + numRows); + tableIdToTotalDeltaNumRows.putIfAbsent(tableId, numRows); + }); } + }); + boolean hasBackendAliveAndUnfinishedTask = publishVersionTaskStream + .anyMatch(task -> !task.isFinished() && infoService.checkBackendAlive(task.getBackendId())); + transactionState.setTableIdToTotalNumDeltaRows(tableIdToTotalDeltaNumRows); + + boolean shouldFinishTxn = !hasBackendAliveAndUnfinishedTask || transactionState.isPublishTimeout() + || DebugPointUtil.isEnable("PublishVersionDaemon.not_wait_unfinished_tasks"); + if (shouldFinishTxn) { + try { + // one transaction exception should not affect other transaction + globalTransactionMgr.finishTransaction(transactionState.getDbId(), + transactionState.getTransactionId(), partitionVisibleVersions, backendPartitions); + } catch (Exception e) { + LOG.warn("error happens when finish transaction {}", transactionState.getTransactionId(), e); + } + if (transactionState.getTransactionStatus() != TransactionStatus.VISIBLE) { + // if finish transaction state failed, then update publish version time, should check + // to finish after some interval + transactionState.updateSendTaskTime(); + if (LOG.isDebugEnabled()) { + LOG.debug("publish version for transaction {} failed", transactionState); } } + } - if (transactionState.getTransactionStatus() == TransactionStatus.VISIBLE) { - for (PublishVersionTask task : transactionState.getPublishVersionTasks().values()) { - AgentTaskQueue.removeTask(task.getBackendId(), TTaskType.PUBLISH_VERSION, task.getSignature()); - } - transactionState.pruneAfterVisible(); - if (MetricRepo.isInit) { - long publishTime = transactionState.getLastPublishVersionTime() - transactionState.getCommitTime(); - MetricRepo.HISTO_TXN_PUBLISH_LATENCY.update(publishTime); - } + if (transactionState.getTransactionStatus() == TransactionStatus.VISIBLE) { + for (PublishVersionTask task : transactionState.getPublishVersionTasks().values()) { + AgentTaskQueue.removeTask(task.getBackendId(), TTaskType.PUBLISH_VERSION, task.getSignature()); } - } // end for readyTransactionStates + transactionState.pruneAfterVisible(); + if (MetricRepo.isInit) { + long publishTime = transactionState.getLastPublishVersionTime() - transactionState.getCommitTime(); + MetricRepo.HISTO_TXN_PUBLISH_LATENCY.update(publishTime); + } + } } private Map> getBaseTabletIdsForEachBe(TransactionState transactionState,