Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,13 @@ private void publishVersion(Map<Long, Long> partitionVisibleVersions, Map<Long,
LOG.warn("some transaction state need to publish, but no backend exists");
return;
}
traverseReadyTxnAndDispatchPublishVersionTask(readyTransactionStates, allBackends);
tryFinishTxn(readyTransactionStates, infoService, globalTransactionMgr, partitionVisibleVersions,
backendPartitions);
}

private void traverseReadyTxnAndDispatchPublishVersionTask(List<TransactionState> readyTransactionStates,
List<Long> allBackends) {
long createPublishVersionTaskTime = System.currentTimeMillis();
// every backend-transaction identified a single task
AgentBatchTask batchTask = new AgentBatchTask();
Expand Down Expand Up @@ -153,60 +160,77 @@ private void publishVersion(Map<Long, Long> partitionVisibleVersions, Map<Long,
if (!batchTask.getAllTasks().isEmpty()) {
AgentTaskExecutor.submit(batchTask);
}
}

Map<Long, Long> tableIdToTotalDeltaNumRows = Maps.newHashMap();
private void tryFinishTxn(List<TransactionState> readyTransactionStates,
SystemInfoService infoService, GlobalTransactionMgr globalTransactionMgr,
Map<Long, Long> partitionVisibleVersions, Map<Long, Set<Long>> backendPartitions) {
// try to finish the transaction, if failed just retry in next loop
for (TransactionState transactionState : readyTransactionStates) {
Stream<PublishVersionTask> publishVersionTaskStream = transactionState
.getPublishVersionTasks()
.values()
.stream()
.peek(task -> {
if (task.isFinished() && CollectionUtils.isEmpty(task.getErrorTablets())) {
Map<Long, Long> tableIdToDeltaNumRows =
task.getTableIdToDeltaNumRows();
tableIdToDeltaNumRows.forEach((tableId, numRows) -> {
tableIdToTotalDeltaNumRows
.computeIfPresent(tableId, (id, orgNumRows) -> orgNumRows + numRows);
tableIdToTotalDeltaNumRows.putIfAbsent(tableId, numRows);
});
}
});
boolean hasBackendAliveAndUnfinishedTask = publishVersionTaskStream
.anyMatch(task -> !task.isFinished() && infoService.checkBackendAlive(task.getBackendId()));
transactionState.setTableIdToTotalNumDeltaRows(tableIdToTotalDeltaNumRows);
try {
// try to finish the transaction, if failed just retry in next loop
tryFinishOneTxn(transactionState, infoService, globalTransactionMgr, partitionVisibleVersions,
backendPartitions);
} catch (Throwable t) {
LOG.error("errors while finish transaction: {}, publish tasks: {}", transactionState,
transactionState.getPublishVersionTasks(), t);
}
} // end for readyTransactionStates
}

boolean shouldFinishTxn = !hasBackendAliveAndUnfinishedTask || transactionState.isPublishTimeout()
|| DebugPointUtil.isEnable("PublishVersionDaemon.not_wait_unfinished_tasks");
if (shouldFinishTxn) {
try {
// one transaction exception should not affect other transaction
globalTransactionMgr.finishTransaction(transactionState.getDbId(),
transactionState.getTransactionId(), partitionVisibleVersions, backendPartitions);
} catch (Exception e) {
LOG.warn("error happens when finish transaction {}", transactionState.getTransactionId(), e);
}
if (transactionState.getTransactionStatus() != TransactionStatus.VISIBLE) {
// if finish transaction state failed, then update publish version time, should check
// to finish after some interval
transactionState.updateSendTaskTime();
if (LOG.isDebugEnabled()) {
LOG.debug("publish version for transaction {} failed", transactionState);
private void tryFinishOneTxn(TransactionState transactionState,
SystemInfoService infoService, GlobalTransactionMgr globalTransactionMgr,
Map<Long, Long> partitionVisibleVersions, Map<Long, Set<Long>> backendPartitions) {
Map<Long, Long> tableIdToTotalDeltaNumRows = Maps.newHashMap();
Stream<PublishVersionTask> publishVersionTaskStream = transactionState
.getPublishVersionTasks()
.values()
.stream()
.peek(task -> {
if (task.isFinished() && CollectionUtils.isEmpty(task.getErrorTablets())) {
Map<Long, Long> tableIdToDeltaNumRows =
task.getTableIdToDeltaNumRows();
tableIdToDeltaNumRows.forEach((tableId, numRows) -> {
tableIdToTotalDeltaNumRows
.computeIfPresent(tableId, (id, orgNumRows) -> orgNumRows + numRows);
tableIdToTotalDeltaNumRows.putIfAbsent(tableId, numRows);
});
}
});
boolean hasBackendAliveAndUnfinishedTask = publishVersionTaskStream
.anyMatch(task -> !task.isFinished() && infoService.checkBackendAlive(task.getBackendId()));
transactionState.setTableIdToTotalNumDeltaRows(tableIdToTotalDeltaNumRows);

boolean shouldFinishTxn = !hasBackendAliveAndUnfinishedTask || transactionState.isPublishTimeout()
|| DebugPointUtil.isEnable("PublishVersionDaemon.not_wait_unfinished_tasks");
if (shouldFinishTxn) {
try {
// one transaction exception should not affect other transaction
globalTransactionMgr.finishTransaction(transactionState.getDbId(),
transactionState.getTransactionId(), partitionVisibleVersions, backendPartitions);
} catch (Exception e) {
LOG.warn("error happens when finish transaction {}", transactionState.getTransactionId(), e);
}
if (transactionState.getTransactionStatus() != TransactionStatus.VISIBLE) {
// if finish transaction state failed, then update publish version time, should check
// to finish after some interval
transactionState.updateSendTaskTime();
if (LOG.isDebugEnabled()) {
LOG.debug("publish version for transaction {} failed", transactionState);
}
}
}

if (transactionState.getTransactionStatus() == TransactionStatus.VISIBLE) {
for (PublishVersionTask task : transactionState.getPublishVersionTasks().values()) {
AgentTaskQueue.removeTask(task.getBackendId(), TTaskType.PUBLISH_VERSION, task.getSignature());
}
transactionState.pruneAfterVisible();
if (MetricRepo.isInit) {
long publishTime = transactionState.getLastPublishVersionTime() - transactionState.getCommitTime();
MetricRepo.HISTO_TXN_PUBLISH_LATENCY.update(publishTime);
}
if (transactionState.getTransactionStatus() == TransactionStatus.VISIBLE) {
for (PublishVersionTask task : transactionState.getPublishVersionTasks().values()) {
AgentTaskQueue.removeTask(task.getBackendId(), TTaskType.PUBLISH_VERSION, task.getSignature());
}
} // end for readyTransactionStates
transactionState.pruneAfterVisible();
if (MetricRepo.isInit) {
long publishTime = transactionState.getLastPublishVersionTime() - transactionState.getCommitTime();
MetricRepo.HISTO_TXN_PUBLISH_LATENCY.update(publishTime);
}
}
}

private Map<Long, Set<Long>> getBaseTabletIdsForEachBe(TransactionState transactionState,
Expand Down