Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion be/src/agent/heartbeat_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,12 @@ void HeartbeatServer::heartbeat(THeartbeatResult& heartbeat_result,
}
watch.stop();
if (watch.elapsed_time() > 1000L * 1000L * 1000L) {
LOG(WARNING) << "heartbeat consume too much time. time=" << watch.elapsed_time();
LOG(WARNING) << "heartbeat consume too much time. time=" << watch.elapsed_time()
<< ", host:" << master_info.network_address.hostname
<< ", port:" << master_info.network_address.port
<< ", cluster id:" << master_info.cluster_id
<< ", frontend_info:" << PrintFrontendInfos(master_info.frontend_infos)
<< ", counter:" << google::COUNTER << ", BE start time: " << _be_epoch;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1334,7 +1334,7 @@ private PublishResult finishCheckQuorumReplicas(TransactionState transactionStat
List<Replica> tabletSuccReplicas = Lists.newArrayList();
List<Replica> tabletWriteFailedReplicas = Lists.newArrayList();
List<Replica> tabletVersionFailedReplicas = Lists.newArrayList();
List<String> logs = Lists.newArrayList();
TabletsPublishResultLogs logs = new TabletsPublishResultLogs();

Map<Long, List<PublishVersionTask>> publishTasks = transactionState.getPublishVersionTasks();
PublishResult publishResult = PublishResult.QUORUM_SUCC;
Expand Down Expand Up @@ -1396,10 +1396,7 @@ private PublishResult finishCheckQuorumReplicas(TransactionState transactionStat
|| now - transactionState.getLastPublishLogTime() > Config.publish_fail_log_interval_second * 1000L;
if (needLog) {
transactionState.setLastPublishLogTime(now);
for (String log : logs) {
LOG.info("{}. publish times {}, whole txn publish result {}",
log, transactionState.getPublishCount(), publishResult.name());
}
logs.log();
}

return publishResult;
Expand Down Expand Up @@ -2607,7 +2604,7 @@ private PublishResult finishCheckQuorumReplicas(TransactionState transactionStat
&& now >= firstPublishVersionTime + Config.publish_wait_time_second * 1000L) {
allowPublishOneSucc = true;
}
List<String> logs = Lists.newArrayList();
TabletsPublishResultLogs logs = new TabletsPublishResultLogs();

Map<Long, List<PublishVersionTask>> publishTasks = transactionState.getPublishVersionTasks();
PublishResult publishResult = PublishResult.QUORUM_SUCC;
Expand Down Expand Up @@ -2733,31 +2730,67 @@ private PublishResult finishCheckQuorumReplicas(TransactionState transactionStat
|| LOG.isDebugEnabled();
if (needLog) {
transactionState.setLastPublishLogTime(now);
for (String log : logs) {
LOG.info("{}. publish times {}, whole txn publish result {}",
log, transactionState.getPublishCount(), publishResult.name());
}
logs.log();
}

return publishResult;
}

private class TabletsPublishResultLogs {
public List<String> quorumSuccLogs = Lists.newArrayList();
public List<String> timeoutSuccLogs = Lists.newArrayList();
public List<String> failedLogs = Lists.newArrayList();

public void addQuorumSuccLog(String log) {
if (quorumSuccLogs.size() < 16) {
quorumSuccLogs.add(log);
}
}

public void addTimeoutSuccLog(String log) {
if (timeoutSuccLogs.size() < 16) {
timeoutSuccLogs.add(log);
}
}

public void addFailedLog(String log) {
if (failedLogs.size() < 16) {
failedLogs.add(log);
}
}

public void log() {
// log failed logs
for (String log : failedLogs) {
LOG.info(log);
}
// log timeout succ logs
for (String log : timeoutSuccLogs) {
LOG.info(log);
}
// log quorum succ logs
for (String log : quorumSuccLogs) {
LOG.info(log);
}
}
}

private PublishResult checkQuorumReplicas(TransactionState transactionState, long tableId, Partition partition,
Tablet tablet, int loadRequiredReplicaNum, boolean allowPublishOneSucc, long newVersion,
List<Replica> tabletSuccReplicas, List<Replica> tabletWriteFailedReplicas,
List<Replica> tabletVersionFailedReplicas, PublishResult publishResult, List<String> logs) {
List<Replica> tabletVersionFailedReplicas, PublishResult publishResult, TabletsPublishResultLogs logs) {
long partitionId = partition.getId();
int healthReplicaNum = tabletSuccReplicas.size();
if (healthReplicaNum >= loadRequiredReplicaNum) {
boolean hasFailedReplica = !tabletWriteFailedReplicas.isEmpty() || !tabletVersionFailedReplicas.isEmpty();
if (hasFailedReplica) {
String writeDetail = getTabletWriteDetail(tabletSuccReplicas, tabletWriteFailedReplicas,
tabletVersionFailedReplicas);
logs.add(String.format("publish version quorum succ for transaction %s on tablet %s"
logs.addQuorumSuccLog(String.format("publish version quorum succ for transaction %s on tablet %s"
+ " with version %s, and has failed replicas, load require replica num %s. "
+ "table %s, partition: [ id=%s, commit version=%s ], tablet detail: %s",
transactionState, tablet.getId(), newVersion, loadRequiredReplicaNum, tableId, partitionId,
partition.getCommittedVersion(), writeDetail));
transactionState, tablet.getId(), newVersion, loadRequiredReplicaNum, tableId,
partitionId, partition.getCommittedVersion(), writeDetail));
}
return publishResult;
}
Expand All @@ -2777,7 +2810,7 @@ private PublishResult checkQuorumReplicas(TransactionState transactionState, lon
// that are being publised exists on a few replicas we should go
// ahead, otherwise data may be lost and thre
// publish task hangs forever.
logs.add(String.format("publish version timeout succ for transaction %s on tablet %s "
logs.addTimeoutSuccLog(String.format("publish version timeout succ for transaction %s on tablet %s "
+ "with version %s, and has failed replicas, load require replica num %s. "
+ "table %s, partition %s, tablet detail: %s", transactionState, tablet.getId(), newVersion,
loadRequiredReplicaNum, tableId, partitionId, writeDetail));
Expand All @@ -2788,7 +2821,7 @@ private PublishResult checkQuorumReplicas(TransactionState transactionState, lon
+ " table: %d, partition: %d, publish version: %d", tablet.getId(), healthReplicaNum,
loadRequiredReplicaNum, tableId, partitionId, newVersion);
transactionState.setErrorMsg(errMsg);
logs.add(String.format("publish version failed for transaction %s on tablet %s with version"
logs.addFailedLog(String.format("publish version failed for transaction %s on tablet %s with version"
+ " %s, and has failed replicas, load required replica num %s. table %s, "
+ "partition %s, tablet detail: %s", transactionState, tablet.getId(), newVersion,
loadRequiredReplicaNum, tableId, partitionId, writeDetail));
Expand Down