Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions be/src/olap/task/engine_clone_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ AgentStatus EngineCloneTask::_clone_copy(
std::string remote_url_prefix;
{
// TODO(zc): if snapshot path has been returned from source, it is some strange to
// concat talbet_id and schema hash here.
// concat tablet_id and schema hash here.
std::stringstream ss;
ss << "http://" << src.host << ":" << src.http_port
<< HTTP_REQUEST_PREFIX
Expand Down Expand Up @@ -468,7 +468,7 @@ Status EngineCloneTask::_download_files(
RETURN_IF_ERROR(FileUtils::remove_all(local_path));
RETURN_IF_ERROR(FileUtils::create_dir(local_path));

// Get remove dir file list
// Get remote dir file list
string file_list_str;
auto list_files_cb = [&remote_url_prefix, &file_list_str] (HttpClient* client) {
RETURN_IF_ERROR(client->init(remote_url_prefix));
Expand Down
18 changes: 18 additions & 0 deletions docs/en/administrator-guide/config/fe_config.md
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,15 @@ This variable is a dynamic configuration, and users can modify the configuration

### `max_bytes_per_broker_scanner`

### `max_clone_task_timeout_sec`

Type: long
Description: Used to control the maximum timeout of a clone task. The unit is second.
Default value: 7200
Dynamic modification: yes

Can cooperate with `mix_clone_task_timeout_sec` to control the maximum and minimum timeout of a clone task. Under normal circumstances, the timeout of a clone task is estimated by the amount of data and the minimum transfer rate (5MB/s). In some special cases, these two configurations can be used to set the upper and lower bounds of the clone task timeout to ensure that the clone task can be completed successfully.

### `max_connection_scheduler_threads_num`

### `max_conn_per_user`
Expand Down Expand Up @@ -455,6 +464,15 @@ This configuration is specifically used to limit timeout setting for stream load

### `min_bytes_per_broker_scanner`

### `min_clone_task_timeout_sec`

Type: long
Description: Used to control the minimum timeout of a clone task. The unit is second.
Default value: 120
Dynamic modification: yes

See the description of `max_clone_task_timeout_sec`.

### `mini_load_default_timeout_second`

### `min_load_timeout_second`
Expand Down
18 changes: 18 additions & 0 deletions docs/zh-CN/administrator-guide/config/fe_config.md
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,15 @@ FE 的配置项有两种方式进行配置:

### `max_bytes_per_broker_scanner`

### `max_clone_task_timeout_sec`

类型:long
说明:用于控制一个 clone 任务的最大超时时间。单位秒。
默认值:7200
动态修改:是

可以配合 `mix_clone_task_timeout_sec` 来控制一个 clone 任务最大和最小的超时间。正常情况下,一个 clone 任务的超时间是通过数据量和最小传输速率(5MB/s)估算的。而在某些特殊情况下,可以通过这两个配置来认为设定 clone 任务超时时间的上下界,以保证 clone 任务可以顺利完成。

### `max_connection_scheduler_threads_num`

### `max_create_table_timeout_second`
Expand Down Expand Up @@ -451,6 +460,15 @@ current running txns on db xxx is xx, larger than limit xx

### `min_bytes_per_broker_scanner`

### `min_clone_task_timeout_sec`

类型:long
说明:用于控制一个 clone 任务的最小超时时间。单位秒。
默认值:120
动态修改:是

见 `max_clone_task_timeout_sec` 说明。

### `mini_load_default_timeout_second`

### `min_load_timeout_second`
Expand Down
26 changes: 16 additions & 10 deletions fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.doris.catalog.Tablet.TabletStatus;
import org.apache.doris.clone.SchedException.Status;
import org.apache.doris.clone.TabletScheduler.PathSlot;
import org.apache.doris.common.Config;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.Pair;
import org.apache.doris.common.util.TimeUtils;
Expand Down Expand Up @@ -83,11 +84,9 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
private static final long MAX_NOT_BEING_SCHEDULED_INTERVAL_MS = 30 * 60 * 1000L; // 30 min

/*
* A clone task timeout is between MIN_CLONE_TASK_TIMEOUT_MS and MAX_CLONE_TASK_TIMEOUT_MS,
* A clone task timeout is between Config.min_clone_task_timeout_sec and Config.max_clone_task_timeout_sec,
* estimated by tablet size / MIN_CLONE_SPEED_MB_PER_SECOND.
*/
private static final long MIN_CLONE_TASK_TIMEOUT_MS = 3 * 60 * 1000L; // 3 min
private static final long MAX_CLONE_TASK_TIMEOUT_MS = 2 * 60 * 60 * 1000L; // 2 hour
private static final long MIN_CLONE_SPEED_MB_PER_SECOND = 5; // 5MB/sec

/*
Expand Down Expand Up @@ -583,10 +582,15 @@ public void chooseDestReplicaForVersionIncomplete(Map<Long, PathSlot> backendsWo
setDest(chosenReplica.getBackendId(), chosenReplica.getPathHash());
}

public void releaseResource(TabletScheduler tabletScheduler) {
releaseResource(tabletScheduler, false);
}

/*
* release all resources before finishing this task
* release all resources before finishing this task.
* if reserveTablet is true, the tablet object in this ctx will not be set to null after calling reset().
*/
public void releaseResource(TabletScheduler tabletScheduler) {
public void releaseResource(TabletScheduler tabletScheduler, boolean reserveTablet) {
if (srcReplica != null) {
Preconditions.checkState(srcPathHash != -1);
PathSlot slot = tabletScheduler.getBackendsWorkingSlots().get(srcReplica.getBackendId());
Expand Down Expand Up @@ -633,17 +637,19 @@ public void releaseResource(TabletScheduler tabletScheduler) {
}
}

reset();
reset(reserveTablet);
}

// reset to save memory after state is done
private void reset() {
private void reset(boolean reserveTablet) {
/*
* If state is PENDING, these fields will be reset when being rescheduled.
* if state is FINISHED/CANCELLED/TIMEOUT, leave these fields for show.
*/
if (state == State.PENDING) {
this.tablet = null;
if (!reserveTablet) {
this.tablet = null;
}
this.srcReplica = null;
this.srcPathHash = -1;
this.destBackendId = -1;
Expand Down Expand Up @@ -725,8 +731,8 @@ public CloneTask createCloneReplicaAndTask() throws SchedException {
private long getApproximateTimeoutMs() {
long tabletSize = getTabletSize();
long timeoutMs = tabletSize / 1024 / 1024 / MIN_CLONE_SPEED_MB_PER_SECOND * 1000;
timeoutMs = Math.max(timeoutMs, MIN_CLONE_TASK_TIMEOUT_MS);
timeoutMs = Math.min(timeoutMs, MAX_CLONE_TASK_TIMEOUT_MS);
timeoutMs = Math.max(timeoutMs, Config.min_clone_task_timeout_sec * 1000);
timeoutMs = Math.min(timeoutMs, Config.max_clone_task_timeout_sec * 1000);
return timeoutMs;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -642,12 +642,40 @@ private void handleReplicaVersionIncomplete(TabletSchedCtx tabletCtx, AgentBatch
/*
* There are enough alive replicas with complete version in this tablet, but some of backends may
* under decommission.
* This process is same as replica missing
* First, we try to find a version incomplete replica on available BE.
* If failed to find, then try to find a new BE to clone the replicas.
*
* Give examples of why:
* Tablet X has 3 replicas on A, B, C 3 BEs.
* C is decommission, so we choose the BE D to relocating the new replica,
* After relocating, Tablet X has 4 replicas: A, B, C(decommision), D(may be version incomplete)
* But D may be version incomplete because the clone task ran a long time, the new version
* has been published.
* At the next time of tablet checking, Tablet X's status is still REPLICA_RELOCATING,
* If we don't choose D as dest BE to do the new relocating, it will choose new backend E to
* store the new replicas. So back and forth, the number of replicas will increase forever.
*/
private void handleReplicaRelocating(TabletSchedCtx tabletCtx, AgentBatchTask batchTask)
throws SchedException {
stat.counterReplicaUnavailableErr.incrementAndGet();
handleReplicaMissing(tabletCtx, batchTask);
try {
handleReplicaVersionIncomplete(tabletCtx, batchTask);
LOG.info("succeed to find version incomplete replica from tablet relocating. tablet id: {}",
tabletCtx.getTabletId());
} catch (SchedException e) {
if (e.getStatus() == Status.SCHEDULE_FAILED) {
LOG.info("failed to find version incomplete replica from tablet relocating. tablet id: {}, "
+ "try to find a new backend", tabletCtx.getTabletId());
// the dest or src slot may be taken after calling handleReplicaVersionIncomplete(),
// so we need to release these slots first.
// and reserve the tablet in TabletSchedCtx so that it can continue to be scheduled.
tabletCtx.releaseResource(this, true);
handleReplicaMissing(tabletCtx, batchTask);
LOG.info("succeed to find new backend for tablet relocating. tablet id: {}", tabletCtx.getTabletId());
} else {
throw e;
}
}
}

/**
Expand Down
12 changes: 12 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/common/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -1149,5 +1149,17 @@ public class Config extends ConfigBase {
@ConfField (mutable = true, masterOnly = true)
public static long agent_task_resend_wait_time_ms = 5000;

/**
* min_clone_task_timeout_sec and max_clone_task_timeout_sec is to limit the
* min and max timeout of a clone task.
* Under normal circumstances, the timeout of a clone task is estimated by
* the amount of data and the minimum transmission speed(5MB/s).
* But in special cases, you may need to manually set these two configs
* to ensure that the clone task will not fail due to timeout.
*/
@ConfField(mutable = true, masterOnly = true)
public static long min_clone_task_timeout_sec = 3 * 60; // 3min
@ConfField(mutable = true, masterOnly = true)
public static long max_clone_task_timeout_sec = 2 * 60 * 60; // 2h
}

Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,6 @@ private static void sync(Map<Long, TTablet> backendTablets, ListMultimap<Long, L
// 1. PUSH finished in BE but failed or not yet report to FE
// 2. repair for VERSION_INCOMPLETE finished in BE, but failed or not yet report to FE
replica.updateVersionInfo(backendVersion, backendVersionHash, dataSize, rowCount);
replica.setBad(false);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For reviewer:
In the original design logic, when the status of a replica is bad, it cannot become not bad again.
This is a clerical error and will cause the admin set replica status function to fail.


if (replica.getLastFailedVersion() < 0 && !isInitVersion) {
// last failed version < 0 means this replica becomes health after sync,
Expand Down