Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -106,4 +106,9 @@ public void setTableIdTabletsDeltaRows(Map<Long, Map<Long, Long>> tableIdToTable
public Map<Long, Map<Long, Long>> getTableIdToTabletDeltaRows() {
return tableIdToTabletDeltaRows;
}

@Override
public String toString() {
return super.toString() + ", txnId=" + transactionId;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1475,12 +1475,7 @@ protected void unprotectedPreCommitTransaction2PC(TransactionState transactionSt
}
// persist transactionState
unprotectUpsertTransactionState(transactionState, false);

// add publish version tasks. set task to null as a placeholder.
// tasks will be created when publishing version.
for (long backendId : totalInvolvedBackends) {
transactionState.addPublishVersionTask(backendId, null);
}
transactionState.setInvolvedBackends(totalInvolvedBackends);
}

private PartitionCommitInfo generatePartitionCommitInfo(OlapTable table, long partitionId, long partitionVersion) {
Expand Down Expand Up @@ -1508,12 +1503,7 @@ protected void unprotectedCommitTransaction(TransactionState transactionState, S
}
// persist transactionState
unprotectUpsertTransactionState(transactionState, false);

// add publish version tasks. set task to null as a placeholder.
// tasks will be created when publishing version.
for (long backendId : totalInvolvedBackends) {
transactionState.addPublishVersionTask(backendId, null);
}
transactionState.setInvolvedBackends(totalInvolvedBackends);
}

private void checkBeforeUnprotectedCommitTransaction(TransactionState transactionState, Set<Long> errorReplicaIds) {
Expand Down Expand Up @@ -1581,9 +1571,6 @@ protected void unprotectedCommitTransaction(TransactionState transactionState, S
}
// persist transactionState
unprotectUpsertTransactionState(transactionState, false);

// add publish version tasks. set task to null as a placeholder.
// tasks will be created when publishing version.
transactionState.setInvolvedBackends(totalInvolvedBackends);
}

Expand Down Expand Up @@ -2686,7 +2673,7 @@ private PublishResult finishCheckQuorumReplicas(TransactionState transactionStat
PublishVersionTask publishVersionTask = null;
if (publishVersionTasks != null) {
List<PublishVersionTask> matchedTasks = publishVersionTasks.stream()
.filter(t -> t.getTransactionId() == subTransactionId
.filter(t -> t != null && t.getTransactionId() == subTransactionId
&& t.getPartitionVersionInfos().stream()
.anyMatch(s -> s.getPartitionId() == partitionId))
.collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,81 +152,93 @@ private void genPublishTask(List<Long> allBackends, TransactionState transaction
private void tryFinishTxn(List<TransactionState> readyTransactionStates,
SystemInfoService infoService, GlobalTransactionMgrIface globalTransactionMgr,
Map<Long, Long> partitionVisibleVersions, Map<Long, Set<Long>> backendPartitions) {
Map<Long, Map<Long, Long>> tableIdToTabletDeltaRows = Maps.newHashMap();
// try to finish the transaction, if failed just retry in next loop
for (TransactionState transactionState : readyTransactionStates) {
AtomicBoolean hasBackendAliveAndUnfinishedTask = new AtomicBoolean(false);
Set<Long> notFinishTaskBe = Sets.newHashSet();
transactionState.getPublishVersionTasks().forEach((key, tasks) -> {
long beId = key;
for (PublishVersionTask task : tasks) {
if (task.isFinished()) {
calculateTaskUpdateRows(tableIdToTabletDeltaRows, task);
} else {
if (infoService.checkBackendAlive(task.getBackendId())) {
hasBackendAliveAndUnfinishedTask.set(true);
}
notFinishTaskBe.add(beId);
try {
// try to finish the transaction, if failed just retry in next loop
tryFinishOneTxn(transactionState, infoService, globalTransactionMgr, partitionVisibleVersions,
backendPartitions);
} catch (Throwable t) {
LOG.error("errors while finish transaction: {}, publish tasks: {}", transactionState,
transactionState.getPublishVersionTasks(), t);
}
} // end for readyTransactionStates
}

private void tryFinishOneTxn(TransactionState transactionState, SystemInfoService infoService,
GlobalTransactionMgrIface globalTransactionMgr,
Map<Long, Long> partitionVisibleVersions, Map<Long, Set<Long>> backendPartitions) {
Map<Long, Map<Long, Long>> tableIdToTabletDeltaRows = Maps.newHashMap();
AtomicBoolean hasBackendAliveAndUnfinishedTask = new AtomicBoolean(false);
Set<Long> notFinishTaskBe = Sets.newHashSet();
transactionState.getPublishVersionTasks().forEach((key, tasks) -> {
long beId = key;
for (PublishVersionTask task : tasks) {
if (task.isFinished()) {
calculateTaskUpdateRows(tableIdToTabletDeltaRows, task);
} else {
if (infoService.checkBackendAlive(task.getBackendId())) {
hasBackendAliveAndUnfinishedTask.set(true);
}
notFinishTaskBe.add(beId);
}
});
}
});

transactionState.setTableIdToTabletDeltaRows(tableIdToTabletDeltaRows);
if (LOG.isDebugEnabled()) {
LOG.debug("notFinishTaskBe {}, trans {}", notFinishTaskBe, transactionState);
transactionState.setTableIdToTabletDeltaRows(tableIdToTabletDeltaRows);
if (LOG.isDebugEnabled()) {
LOG.debug("notFinishTaskBe {}, trans {}", notFinishTaskBe, transactionState);
}
boolean isPublishSlow = false;
long totalNum = transactionState.getPublishVersionTasks().keySet().size();
boolean allUnFinishTaskIsSlow = notFinishTaskBe.stream().allMatch(beId -> {
Backend be = infoService.getBackend(beId);
if (be == null) {
return false;
}
boolean isPublishSlow = false;
long totalNum = transactionState.getPublishVersionTasks().keySet().size();
boolean allUnFinishTaskIsSlow = notFinishTaskBe.stream().allMatch(beId -> {
Backend be = infoService.getBackend(beId);
if (be == null) {
return false;
}
return be.getPublishTaskLastTimeAccumulated() > Config.publish_version_queued_limit_number;
});
if (totalNum - notFinishTaskBe.size() > totalNum / 2 && allUnFinishTaskIsSlow) {
if (LOG.isDebugEnabled()) {
LOG.debug(" finishNum {}, txn publish tasks {}, notFinishTaskBe {}",
totalNum - notFinishTaskBe.size(), transactionState.getPublishVersionTasks().keySet(),
notFinishTaskBe);
}
isPublishSlow = true;
return be.getPublishTaskLastTimeAccumulated() > Config.publish_version_queued_limit_number;
});
if (totalNum - notFinishTaskBe.size() > totalNum / 2 && allUnFinishTaskIsSlow) {
if (LOG.isDebugEnabled()) {
LOG.debug(" finishNum {}, txn publish tasks {}, notFinishTaskBe {}",
totalNum - notFinishTaskBe.size(), transactionState.getPublishVersionTasks().keySet(),
notFinishTaskBe);
}
isPublishSlow = true;
}

boolean shouldFinishTxn = !hasBackendAliveAndUnfinishedTask.get() || transactionState.isPublishTimeout()
|| isPublishSlow
|| DebugPointUtil.isEnable("PublishVersionDaemon.not_wait_unfinished_tasks");
if (shouldFinishTxn) {
try {
// one transaction exception should not affect other transaction
globalTransactionMgr.finishTransaction(transactionState.getDbId(),
transactionState.getTransactionId(), partitionVisibleVersions, backendPartitions);
} catch (Exception e) {
LOG.warn("error happens when finish transaction {}", transactionState.getTransactionId(), e);
}
if (transactionState.getTransactionStatus() != TransactionStatus.VISIBLE) {
// if finish transaction state failed, then update publish version time, should check
// to finish after some interval
transactionState.updateSendTaskTime();
if (LOG.isDebugEnabled()) {
LOG.debug("publish version for transaction {} failed", transactionState);
}
boolean shouldFinishTxn = !hasBackendAliveAndUnfinishedTask.get() || transactionState.isPublishTimeout()
|| isPublishSlow
|| DebugPointUtil.isEnable("PublishVersionDaemon.not_wait_unfinished_tasks");
if (shouldFinishTxn) {
try {
// one transaction exception should not affect other transaction
globalTransactionMgr.finishTransaction(transactionState.getDbId(),
transactionState.getTransactionId(), partitionVisibleVersions, backendPartitions);
} catch (Exception e) {
LOG.warn("error happens when finish transaction {}", transactionState.getTransactionId(), e);
}
if (transactionState.getTransactionStatus() != TransactionStatus.VISIBLE) {
// if finish transaction state failed, then update publish version time, should check
// to finish after some interval
transactionState.updateSendTaskTime();
if (LOG.isDebugEnabled()) {
LOG.debug("publish version for transaction {} failed", transactionState);
}
}
}

if (transactionState.getTransactionStatus() == TransactionStatus.VISIBLE) {
transactionState.getPublishVersionTasks().values().forEach(tasks -> {
for (PublishVersionTask task : tasks) {
AgentTaskQueue.removeTask(task.getBackendId(), TTaskType.PUBLISH_VERSION, task.getSignature());
}
});
transactionState.pruneAfterVisible();
if (MetricRepo.isInit) {
long publishTime = transactionState.getLastPublishVersionTime() - transactionState.getCommitTime();
MetricRepo.HISTO_TXN_PUBLISH_LATENCY.update(publishTime);
if (transactionState.getTransactionStatus() == TransactionStatus.VISIBLE) {
transactionState.getPublishVersionTasks().values().forEach(tasks -> {
for (PublishVersionTask task : tasks) {
AgentTaskQueue.removeTask(task.getBackendId(), TTaskType.PUBLISH_VERSION, task.getSignature());
}
});
transactionState.pruneAfterVisible();
if (MetricRepo.isInit) {
long publishTime = transactionState.getLastPublishVersionTime() - transactionState.getCommitTime();
MetricRepo.HISTO_TXN_PUBLISH_LATENCY.update(publishTime);
}
} // end for readyTransactionStates
}
}

// Merge task tablets update rows to tableToTabletsDelta.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@ public void setErrorReplicas(Set<Long> newErrorReplicas) {
}

public void addPublishVersionTask(Long backendId, PublishVersionTask task) {
if (this.subTxnIdToTableCommitInfo.isEmpty()) {
if (this.subTxnIds == null) {
this.publishVersionTasks.put(backendId, Lists.newArrayList(task));
} else {
this.publishVersionTasks.computeIfAbsent(backendId, k -> Lists.newArrayList()).add(task);
Expand Down Expand Up @@ -822,7 +822,7 @@ public String getErrMsg() {
public void pruneAfterVisible() {
publishVersionTasks.clear();
tableIdToTabletDeltaRows.clear();
// TODO if subTransactionStates can be cleared?
involvedBackends.clear();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add @SerializedName to involvedBackends ?

}

public void setSchemaForPartialUpdate(OlapTable olapTable) {
Expand Down