From 3e54491a5aa2bab4071ce10bdf79d69b228b49b4 Mon Sep 17 00:00:00 2001 From: meiyi Date: Tue, 9 Jul 2024 15:26:05 +0800 Subject: [PATCH 1/2] fix publish --- .../apache/doris/task/PublishVersionTask.java | 5 + .../transaction/DatabaseTransactionMgr.java | 2 +- .../transaction/PublishVersionDaemon.java | 138 ++++++++++-------- .../doris/transaction/TransactionState.java | 2 +- 4 files changed, 82 insertions(+), 65 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/PublishVersionTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/PublishVersionTask.java index 0cde6de539a675..2a369d0cf4ca6c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/task/PublishVersionTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/task/PublishVersionTask.java @@ -106,4 +106,9 @@ public void setTableIdTabletsDeltaRows(Map> tableIdToTable public Map> getTableIdToTabletDeltaRows() { return tableIdToTabletDeltaRows; } + + @Override + public String toString() { + return super.toString() + ", txnId=" + transactionId; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java index cd2ae90398672f..38444d972295ca 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java @@ -2686,7 +2686,7 @@ private PublishResult finishCheckQuorumReplicas(TransactionState transactionStat PublishVersionTask publishVersionTask = null; if (publishVersionTasks != null) { List matchedTasks = publishVersionTasks.stream() - .filter(t -> t.getTransactionId() == subTransactionId + .filter(t -> t != null && t.getTransactionId() == subTransactionId && t.getPartitionVersionInfos().stream() .anyMatch(s -> s.getPartitionId() == partitionId)) .collect(Collectors.toList()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java index c077f0807105bd..82e954b6a1b3d3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java @@ -152,81 +152,93 @@ private void genPublishTask(List allBackends, TransactionState transaction private void tryFinishTxn(List readyTransactionStates, SystemInfoService infoService, GlobalTransactionMgrIface globalTransactionMgr, Map partitionVisibleVersions, Map> backendPartitions) { - Map> tableIdToTabletDeltaRows = Maps.newHashMap(); - // try to finish the transaction, if failed just retry in next loop for (TransactionState transactionState : readyTransactionStates) { - AtomicBoolean hasBackendAliveAndUnfinishedTask = new AtomicBoolean(false); - Set notFinishTaskBe = Sets.newHashSet(); - transactionState.getPublishVersionTasks().forEach((key, tasks) -> { - long beId = key; - for (PublishVersionTask task : tasks) { - if (task.isFinished()) { - calculateTaskUpdateRows(tableIdToTabletDeltaRows, task); - } else { - if (infoService.checkBackendAlive(task.getBackendId())) { - hasBackendAliveAndUnfinishedTask.set(true); - } - notFinishTaskBe.add(beId); + try { + // try to finish the transaction, if failed just retry in next loop + tryFinishOneTxn(transactionState, infoService, globalTransactionMgr, partitionVisibleVersions, + backendPartitions); + } catch (Throwable t) { + LOG.error("errors while finish transaction: {}, publish tasks: {}", transactionState, + transactionState.getPublishVersionTasks(), t); + } + } // end for readyTransactionStates + } + + private void tryFinishOneTxn(TransactionState transactionState, SystemInfoService infoService, + GlobalTransactionMgrIface globalTransactionMgr, + Map partitionVisibleVersions, Map> backendPartitions) { + Map> tableIdToTabletDeltaRows = Maps.newHashMap(); + AtomicBoolean hasBackendAliveAndUnfinishedTask = new AtomicBoolean(false); + Set notFinishTaskBe = Sets.newHashSet(); + transactionState.getPublishVersionTasks().forEach((key, tasks) -> { + long beId = key; + for (PublishVersionTask task : tasks) { + if (task.isFinished()) { + calculateTaskUpdateRows(tableIdToTabletDeltaRows, task); + } else { + if (infoService.checkBackendAlive(task.getBackendId())) { + hasBackendAliveAndUnfinishedTask.set(true); } + notFinishTaskBe.add(beId); } - }); + } + }); - transactionState.setTableIdToTabletDeltaRows(tableIdToTabletDeltaRows); - if (LOG.isDebugEnabled()) { - LOG.debug("notFinishTaskBe {}, trans {}", notFinishTaskBe, transactionState); + transactionState.setTableIdToTabletDeltaRows(tableIdToTabletDeltaRows); + if (LOG.isDebugEnabled()) { + LOG.debug("notFinishTaskBe {}, trans {}", notFinishTaskBe, transactionState); + } + boolean isPublishSlow = false; + long totalNum = transactionState.getPublishVersionTasks().keySet().size(); + boolean allUnFinishTaskIsSlow = notFinishTaskBe.stream().allMatch(beId -> { + Backend be = infoService.getBackend(beId); + if (be == null) { + return false; } - boolean isPublishSlow = false; - long totalNum = transactionState.getPublishVersionTasks().keySet().size(); - boolean allUnFinishTaskIsSlow = notFinishTaskBe.stream().allMatch(beId -> { - Backend be = infoService.getBackend(beId); - if (be == null) { - return false; - } - return be.getPublishTaskLastTimeAccumulated() > Config.publish_version_queued_limit_number; - }); - if (totalNum - notFinishTaskBe.size() > totalNum / 2 && allUnFinishTaskIsSlow) { - if (LOG.isDebugEnabled()) { - LOG.debug(" finishNum {}, txn publish tasks {}, notFinishTaskBe {}", - totalNum - notFinishTaskBe.size(), transactionState.getPublishVersionTasks().keySet(), - notFinishTaskBe); - } - isPublishSlow = true; + return be.getPublishTaskLastTimeAccumulated() > Config.publish_version_queued_limit_number; + }); + if (totalNum - notFinishTaskBe.size() > totalNum / 2 && allUnFinishTaskIsSlow) { + if (LOG.isDebugEnabled()) { + LOG.debug(" finishNum {}, txn publish tasks {}, notFinishTaskBe {}", + totalNum - notFinishTaskBe.size(), transactionState.getPublishVersionTasks().keySet(), + notFinishTaskBe); } + isPublishSlow = true; + } - boolean shouldFinishTxn = !hasBackendAliveAndUnfinishedTask.get() || transactionState.isPublishTimeout() - || isPublishSlow - || DebugPointUtil.isEnable("PublishVersionDaemon.not_wait_unfinished_tasks"); - if (shouldFinishTxn) { - try { - // one transaction exception should not affect other transaction - globalTransactionMgr.finishTransaction(transactionState.getDbId(), - transactionState.getTransactionId(), partitionVisibleVersions, backendPartitions); - } catch (Exception e) { - LOG.warn("error happens when finish transaction {}", transactionState.getTransactionId(), e); - } - if (transactionState.getTransactionStatus() != TransactionStatus.VISIBLE) { - // if finish transaction state failed, then update publish version time, should check - // to finish after some interval - transactionState.updateSendTaskTime(); - if (LOG.isDebugEnabled()) { - LOG.debug("publish version for transaction {} failed", transactionState); - } + boolean shouldFinishTxn = !hasBackendAliveAndUnfinishedTask.get() || transactionState.isPublishTimeout() + || isPublishSlow + || DebugPointUtil.isEnable("PublishVersionDaemon.not_wait_unfinished_tasks"); + if (shouldFinishTxn) { + try { + // one transaction exception should not affect other transaction + globalTransactionMgr.finishTransaction(transactionState.getDbId(), + transactionState.getTransactionId(), partitionVisibleVersions, backendPartitions); + } catch (Exception e) { + LOG.warn("error happens when finish transaction {}", transactionState.getTransactionId(), e); + } + if (transactionState.getTransactionStatus() != TransactionStatus.VISIBLE) { + // if finish transaction state failed, then update publish version time, should check + // to finish after some interval + transactionState.updateSendTaskTime(); + if (LOG.isDebugEnabled()) { + LOG.debug("publish version for transaction {} failed", transactionState); } } + } - if (transactionState.getTransactionStatus() == TransactionStatus.VISIBLE) { - transactionState.getPublishVersionTasks().values().forEach(tasks -> { - for (PublishVersionTask task : tasks) { - AgentTaskQueue.removeTask(task.getBackendId(), TTaskType.PUBLISH_VERSION, task.getSignature()); - } - }); - transactionState.pruneAfterVisible(); - if (MetricRepo.isInit) { - long publishTime = transactionState.getLastPublishVersionTime() - transactionState.getCommitTime(); - MetricRepo.HISTO_TXN_PUBLISH_LATENCY.update(publishTime); + if (transactionState.getTransactionStatus() == TransactionStatus.VISIBLE) { + transactionState.getPublishVersionTasks().values().forEach(tasks -> { + for (PublishVersionTask task : tasks) { + AgentTaskQueue.removeTask(task.getBackendId(), TTaskType.PUBLISH_VERSION, task.getSignature()); } + }); + transactionState.pruneAfterVisible(); + if (MetricRepo.isInit) { + long publishTime = transactionState.getLastPublishVersionTime() - transactionState.getCommitTime(); + MetricRepo.HISTO_TXN_PUBLISH_LATENCY.update(publishTime); } - } // end for readyTransactionStates + } } // Merge task tablets update rows to tableToTabletsDelta. diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java index 74396538492a48..fc0931737dc862 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java @@ -389,7 +389,7 @@ public void setErrorReplicas(Set newErrorReplicas) { } public void addPublishVersionTask(Long backendId, PublishVersionTask task) { - if (this.subTxnIdToTableCommitInfo.isEmpty()) { + if (this.subTxnIds == null) { this.publishVersionTasks.put(backendId, Lists.newArrayList(task)); } else { this.publishVersionTasks.computeIfAbsent(backendId, k -> Lists.newArrayList()).add(task); From 15f2620cf1e34f5dd202d75c9fdca16b1b673d4f Mon Sep 17 00:00:00 2001 From: meiyi Date: Tue, 9 Jul 2024 16:31:52 +0800 Subject: [PATCH 2/2] fix --- .../transaction/DatabaseTransactionMgr.java | 17 ++--------------- .../doris/transaction/TransactionState.java | 2 +- 2 files changed, 3 insertions(+), 16 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java index 38444d972295ca..915351246e35fd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java @@ -1475,12 +1475,7 @@ protected void unprotectedPreCommitTransaction2PC(TransactionState transactionSt } // persist transactionState unprotectUpsertTransactionState(transactionState, false); - - // add publish version tasks. set task to null as a placeholder. - // tasks will be created when publishing version. - for (long backendId : totalInvolvedBackends) { - transactionState.addPublishVersionTask(backendId, null); - } + transactionState.setInvolvedBackends(totalInvolvedBackends); } private PartitionCommitInfo generatePartitionCommitInfo(OlapTable table, long partitionId, long partitionVersion) { @@ -1508,12 +1503,7 @@ protected void unprotectedCommitTransaction(TransactionState transactionState, S } // persist transactionState unprotectUpsertTransactionState(transactionState, false); - - // add publish version tasks. set task to null as a placeholder. - // tasks will be created when publishing version. - for (long backendId : totalInvolvedBackends) { - transactionState.addPublishVersionTask(backendId, null); - } + transactionState.setInvolvedBackends(totalInvolvedBackends); } private void checkBeforeUnprotectedCommitTransaction(TransactionState transactionState, Set errorReplicaIds) { @@ -1581,9 +1571,6 @@ protected void unprotectedCommitTransaction(TransactionState transactionState, S } // persist transactionState unprotectUpsertTransactionState(transactionState, false); - - // add publish version tasks. set task to null as a placeholder. - // tasks will be created when publishing version. transactionState.setInvolvedBackends(totalInvolvedBackends); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java index fc0931737dc862..394e7ca4bf709a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java @@ -822,7 +822,7 @@ public String getErrMsg() { public void pruneAfterVisible() { publishVersionTasks.clear(); tableIdToTabletDeltaRows.clear(); - // TODO if subTransactionStates can be cleared? + involvedBackends.clear(); } public void setSchemaForPartialUpdate(OlapTable olapTable) {