Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions docs/en/administrator-guide/config/fe_config.md
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,16 @@ There are two ways to configure FE configuration items:

## Configurations

### `agent_task_resend_wait_time_ms`

This configuration will decide whether to resend agent task when create_time for agent_task is set, only when current_time - create_time > agent_task_resend_wait_time_ms can ReportHandler do resend agent task.

This configuration is currently mainly used to solve the problem of repeated sending of `PUBLISH_VERSION` agent tasks. The current default value of this configuration is 5000, which is an experimental value.

Because there is a certain time delay between submitting agent tasks to AgentTaskQueue and submitting to be, Increasing the value of this configuration can effectively solve the problem of repeated sending of agent tasks,

But at the same time, it will cause the submission of failed or failed execution of the agent task to be executed again for an extended period of time.

### `alter_table_timeout_second`

### `async_load_task_pool_size`
Expand Down
8 changes: 8 additions & 0 deletions docs/zh-CN/administrator-guide/config/fe_config.md
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,14 @@ FE 的配置项有两种方式进行配置:

## 配置项列表

### `agent_task_resend_wait_time_ms`

当代理任务的创建时间被设置的时候,此配置将决定是否重新发送代理任务, 当且仅当当前时间减去创建时间大于 `agent_task_task_resend_wait_time_ms` 时,ReportHandler可以重新发送代理任务。

该配置目前主要用来解决`PUBLISH_VERSION`代理任务的重复发送问题, 目前该配置的默认值是5000,是个实验值,由于把代理任务提交到代理任务队列和提交到be存在一定的时间延迟,所以调大该配置的值可以有效解决代理任务的重复发送问题,

但同时会导致提交失败或者执行失败的代理任务再次被执行的时间延长。

### `alter_table_timeout_second`

### `async_load_task_pool_size`
Expand Down
31 changes: 22 additions & 9 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java
Original file line number Diff line number Diff line change
Expand Up @@ -73,16 +73,16 @@ public enum ReplicaStatus {
private long backendId;
// the version could be queried
@SerializedName(value = "version")
private long version;
private volatile long version;
@SerializedName(value = "versionHash")
private long versionHash;
private int schemaHash = -1;
@SerializedName(value = "dataSize")
private long dataSize = 0;
private volatile long dataSize = 0;
@SerializedName(value = "rowCount")
private long rowCount = 0;
private volatile long rowCount = 0;
@SerializedName(value = "state")
private ReplicaState state;
private volatile ReplicaState state;

// the last load failed version
@SerializedName(value = "lastFailedVersion")
Expand Down Expand Up @@ -318,12 +318,23 @@ private void updateReplicaInfo(long newVersion, long newVersionHash,
long lastFailedVersion, long lastFailedVersionHash,
long lastSuccessVersion, long lastSuccessVersionHash,
long newDataSize, long newRowCount) {
LOG.debug("before update: {}", this.toString());
if (LOG.isDebugEnabled()) {
LOG.debug("before update: {}", this.toString());
}

if (newVersion < this.version) {
// yiguolei: could not find any reason why new version less than this.version should run???
LOG.warn("replica {} on backend {}'s new version {} is lower than meta version {}",
id, backendId, newVersion, this.version);
// This case means that replica meta version has been updated by ReportHandler before
// For example, the publish version daemon has already sent some publish verison tasks to one be to publish version 2, 3, 4, 5, 6,
// and the be finish all publish version tasks, the be's replica version is 6 now, but publish version daemon need to wait
// for other be to finish most of publish version tasks to update replica version in fe.
// At the moment, the replica version in fe is 4, when ReportHandler sync tablet, it find reported replica version in be is 6 and then
// set version to 6 for replica in fe. And then publish version daemon try to finish txn, and use visible version(5)
// to update replica. Finally, it find the newer version(5) is lower than replica version(6) in fe.
if (LOG.isDebugEnabled()) {
LOG.debug("replica {} on backend {}'s new version {} is lower than meta version {},"
+ "not to continue to update replica", id, backendId, newVersion, this.version);
}
return;
}

this.version = newVersion;
Expand Down Expand Up @@ -383,7 +394,9 @@ private void updateReplicaInfo(long newVersion, long newVersionHash,
}
}

LOG.debug("after update {}", this.toString());
if (LOG.isDebugEnabled()) {
LOG.debug("after update {}", this.toString());
}
}

public synchronized void updateLastFailedVersion(long lastFailedVersion, long lastFailedVersionHash) {
Expand Down
8 changes: 8 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/common/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -1117,5 +1117,13 @@ public class Config extends ConfigBase {
*/
@ConfField
public static String thrift_server_type = ThriftServer.THREAD_POOL;

/**
* This config will decide whether to resend agent task when create_time for agent_task is set,
* only when current_time - create_time > agent_task_resend_wait_time_ms can ReportHandler do resend agent task
*/
@ConfField (mutable = true, masterOnly = true)
public static long agent_task_resend_wait_time_ms = 5000;

}

Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public class Log4jConfig extends XmlConfiguration {
" <Appenders>\n" +
" <RollingFile name=\"Sys\" fileName=\"${sys_log_dir}/fe.log\" filePattern=\"${sys_log_dir}/fe.log.${sys_file_pattern}-%i\">\n" +
" <PatternLayout charset=\"UTF-8\">\n" +
" <Pattern>%d{yyyy-MM-dd HH:mm:ss,SSS} %p %tid [%C{1}.%M():%L] %m%n</Pattern>\n" +
" <Pattern>%d{yyyy-MM-dd HH:mm:ss,SSS} %p (%t|%tid) [%C{1}.%M():%L] %m%n</Pattern>\n" +
" </PatternLayout>\n" +
" <Policies>\n" +
" <TimeBasedTriggeringPolicy/>\n" +
Expand All @@ -57,7 +57,7 @@ public class Log4jConfig extends XmlConfiguration {
" </RollingFile>\n" +
" <RollingFile name=\"SysWF\" fileName=\"${sys_log_dir}/fe.warn.log\" filePattern=\"${sys_log_dir}/fe.warn.log.${sys_file_pattern}-%i\">\n" +
" <PatternLayout charset=\"UTF-8\">\n" +
" <Pattern>%d{yyyy-MM-dd HH:mm:ss,SSS} %p %tid [%C{1}.%M():%L] %m%n</Pattern>\n" +
" <Pattern>%d{yyyy-MM-dd HH:mm:ss,SSS} %p (%t|%tid) [%C{1}.%M():%L] %m%n</Pattern>\n" +
" </PatternLayout>\n" +
" <Policies>\n" +
" <TimeBasedTriggeringPolicy/>\n" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -328,17 +328,20 @@ private static void taskReport(long backendId, Map<TTaskType, Set<Long>> running
LOG.info("begin to handle task report from backend {}", backendId);
long start = System.currentTimeMillis();

for (TTaskType type : runningTasks.keySet()) {
Set<Long> taskSet = runningTasks.get(type);
if (!taskSet.isEmpty()) {
String signatures = StringUtils.join(taskSet, ", ");
LOG.debug("backend task[{}]: {}", type.name(), signatures);
if (LOG.isDebugEnabled()) {
for (TTaskType type : runningTasks.keySet()) {
Set<Long> taskSet = runningTasks.get(type);
if (!taskSet.isEmpty()) {
String signatures = StringUtils.join(taskSet, ", ");
LOG.debug("backend task[{}]: {}", type.name(), signatures);
}
}
}

List<AgentTask> diffTasks = AgentTaskQueue.getDiffTasks(backendId, runningTasks);

AgentBatchTask batchTask = new AgentBatchTask();
long taskReportTime = System.currentTimeMillis();
for (AgentTask task : diffTasks) {
// these tasks no need to do diff
// 1. CREATE
Expand All @@ -350,7 +353,12 @@ private static void taskReport(long backendId, Map<TTaskType, Set<Long>> running
|| task.getTaskType() == TTaskType.CHECK_CONSISTENCY) {
continue;
}
batchTask.addTask(task);

// to escape sending duplicate agent task to be
if (task.shouldResend(taskReportTime)) {
batchTask.addTask(task);
}

}

LOG.debug("get {} diff task(s) to resend", batchTask.getTaskNum());
Expand Down Expand Up @@ -742,10 +750,11 @@ private static void handleMigration(ListMultimap<TStorageMedium, Long> tabletMet
private static void handleRepublishVersionInfo(Map<Long, ListMultimap<Long, TPartitionVersionInfo>> transactionsToPublish,
long backendId) {
AgentBatchTask batchTask = new AgentBatchTask();
long createPublishVersionTaskTime = System.currentTimeMillis();
for (Long dbId : transactionsToPublish.keySet()) {
ListMultimap<Long, TPartitionVersionInfo> map = transactionsToPublish.get(dbId);
for (long txnId : map.keySet()) {
PublishVersionTask task = new PublishVersionTask(backendId, txnId, dbId, map.get(txnId));
PublishVersionTask task = new PublishVersionTask(backendId, txnId, dbId, map.get(txnId), createPublishVersionTaskTime);
batchTask.addTask(task);
// add to AgentTaskQueue for handling finish report.
AgentTaskQueue.addTask(task);
Expand Down
16 changes: 14 additions & 2 deletions fe/fe-core/src/main/java/org/apache/doris/task/AgentTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.doris.task;

import org.apache.doris.common.Config;
import org.apache.doris.thrift.TResourceInfo;
import org.apache.doris.thrift.TTaskType;

Expand All @@ -39,9 +40,10 @@ public abstract class AgentTask {
// some of are not.
// so whether the task is finished depends on caller's logic, not the value of this member.
protected boolean isFinished = false;
protected long createTime;

public AgentTask(TResourceInfo resourceInfo, long backendId, TTaskType taskType,
long dbId, long tableId, long partitionId, long indexId, long tabletId, long signature) {
long dbId, long tableId, long partitionId, long indexId, long tabletId, long signature, long createTime) {
this.backendId = backendId;
this.signature = signature;
this.taskType = taskType;
Expand All @@ -55,11 +57,17 @@ public AgentTask(TResourceInfo resourceInfo, long backendId, TTaskType taskType,
this.resourceInfo = resourceInfo;

this.failedTimes = 0;
this.createTime = createTime;
}

public AgentTask(TResourceInfo resourceInfo, long backendId, TTaskType taskType,
long dbId, long tableId, long partitionId, long indexId, long tabletId) {
this(resourceInfo, backendId, taskType, dbId, tableId, partitionId, indexId, tabletId, tabletId);
this(resourceInfo, backendId, taskType, dbId, tableId, partitionId, indexId, tabletId, tabletId, -1);
}

public AgentTask(TResourceInfo resourceInfo, long backendId, TTaskType taskType,
long dbId, long tableId, long partitionId, long indexId, long tabletId, long signature) {
this(resourceInfo, backendId, taskType, dbId, tableId, partitionId, indexId, tabletId, signature, -1);
}

public long getSignature() {
Expand Down Expand Up @@ -122,6 +130,10 @@ public boolean isFinished() {
return isFinished;
}

public boolean shouldResend(long currentTimeMillis) {
return createTime == -1 || currentTimeMillis - createTime > Config.agent_task_resend_wait_time_ms;
}

@Override
public String toString() {
return "[" + taskType + "], signature: " + signature + ", backendId: " + backendId + ", tablet id: " + tabletId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ public class PublishVersionTask extends AgentTask {
private boolean isFinished;

public PublishVersionTask(long backendId, long transactionId, long dbId,
List<TPartitionVersionInfo> partitionVersionInfos) {
super(null, backendId, TTaskType.PUBLISH_VERSION, dbId, -1L, -1L, -1L, -1L, transactionId);
List<TPartitionVersionInfo> partitionVersionInfos, long createTime) {
super(null, backendId, TTaskType.PUBLISH_VERSION, dbId, -1L, -1L, -1L, -1L, transactionId, createTime);
this.transactionId = transactionId;
this.partitionVersionInfos = partitionVersionInfos;
this.errorTablets = new ArrayList<Long>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -721,9 +721,8 @@ public void finishTransaction(long transactionId, Set<Long> errorReplicaIds) thr
+ " and its version not equal to partition commit version or commit version - 1"
+ " if its not a upgrate stage, its a fatal error. ", transactionState, replica);
}
} else if (replica.getVersion() == partitionCommitInfo.getVersion()
&& replica.getVersionHash() == partitionCommitInfo.getVersionHash()) {
// the replica's version and version hash is equal to current transaction partition's version and version hash
} else if (replica.getVersion() >= partitionCommitInfo.getVersion()) {
// the replica's version is larger than or equal to current transaction partition's version
// the replica is normal, then remove it from error replica ids
errorReplicaIds.remove(replica.getId());
++healthReplicaNum;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ private void publishVersion() throws UserException {
LOG.warn("some transaction state need to publish, but no backend exists");
return;
}
long createPublishVersionTaskTime = System.currentTimeMillis();
// every backend-transaction identified a single task
AgentBatchTask batchTask = new AgentBatchTask();
// traverse all ready transactions and dispatch the publish version task to all backends
Expand Down Expand Up @@ -113,7 +114,8 @@ private void publishVersion() throws UserException {
PublishVersionTask task = new PublishVersionTask(backendId,
transactionState.getTransactionId(),
transactionState.getDbId(),
partitionVersionInfos);
partitionVersionInfos,
createPublishVersionTaskTime);
// add to AgentTaskQueue for handling finish report.
// not check return value, because the add will success
AgentTaskQueue.addTask(task);
Expand Down